Whamcloud - gitweb
c5106ba65b390074d1534192ba3e75a2f69b6e09
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  */
25
26 #define DEBUG_SUBSYSTEM S_RPC
27 #ifndef __KERNEL__
28 # include <liblustre.h>
29 #endif
30
31 #include <obd_support.h>
32 #include <lustre_ha.h>
33 #include <lustre_net.h>
34 #include <lustre_import.h>
35 #include <lustre_export.h>
36 #include <obd.h>
37 #include <obd_class.h>
38
39 #include "ptlrpc_internal.h"
40
41 struct ptlrpc_connect_async_args {
42          __u64 pcaa_peer_committed;
43         int pcaa_initial_connect;
44 };
45
46 /* A CLOSED import should remain so. */
47 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
48 do {                                                                           \
49         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
50                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
51                       imp, obd2cli_tgt(imp->imp_obd),                          \
52                       ptlrpc_import_state_name(imp->imp_state),                \
53                       ptlrpc_import_state_name(state));                        \
54                imp->imp_state = state;                                         \
55         }                                                                      \
56 } while(0)
57
58 #define IMPORT_SET_STATE(imp, state)            \
59 do {                                            \
60         spin_lock(&imp->imp_lock);              \
61         IMPORT_SET_STATE_NOLOCK(imp, state);    \
62         spin_unlock(&imp->imp_lock);            \
63 } while(0)
64
65
66 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
67                                     void * data, int rc);
68 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
69
70 /* Only this function is allowed to change the import state when it is
71  * CLOSED. I would rather refcount the import and free it after
72  * disconnection like we do with exports. To do that, the client_obd
73  * will need to save the peer info somewhere other than in the import,
74  * though. */
75 int ptlrpc_init_import(struct obd_import *imp)
76 {
77         spin_lock(&imp->imp_lock);
78
79         imp->imp_generation++;
80         imp->imp_state =  LUSTRE_IMP_NEW;
81
82         spin_unlock(&imp->imp_lock);
83
84         return 0;
85 }
86 EXPORT_SYMBOL(ptlrpc_init_import);
87
88 #define UUID_STR "_UUID"
89 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
90                       int *uuid_len)
91 {
92         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
93                 ? uuid : uuid + strlen(prefix);
94
95         *uuid_len = strlen(*uuid_start);
96
97         if (*uuid_len < strlen(UUID_STR))
98                 return;
99
100         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
101                     UUID_STR, strlen(UUID_STR)))
102                 *uuid_len -= strlen(UUID_STR);
103 }
104
105 /* Returns true if import was FULL, false if import was already not
106  * connected.
107  * @imp - import to be disconnected
108  * @conn_cnt - connection count (epoch) of the request that timed out
109  *             and caused the disconnection.  In some cases, multiple
110  *             inflight requests can fail to a single target (e.g. OST
111  *             bulk requests) and if one has already caused a reconnection
112  *             (increasing the import->conn_cnt) the older failure should
113  *             not also cause a reconnection.  If zero it forces a reconnect.
114  */
115 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
116 {
117         int rc = 0;
118
119         spin_lock(&imp->imp_lock);
120
121         if (imp->imp_state == LUSTRE_IMP_FULL &&
122             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
123                 char *target_start;
124                 int   target_len;
125
126                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
127                           &target_start, &target_len);
128                 if (imp->imp_replayable) {
129                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
130                                "%s was lost; in progress operations using this "
131                                "service will wait for recovery to complete.\n",
132                                imp->imp_obd->obd_name, target_len, target_start,
133                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
134                 } else {
135                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
136                                "%.*s via nid %s was lost; in progress "
137                                "operations using this service will fail.\n",
138                                imp->imp_obd->obd_name, target_len, target_start, 
139                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
140                 }
141                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
142                 spin_unlock(&imp->imp_lock);
143     
144                 if (obd_dump_on_timeout)
145                         libcfs_debug_dumplog();
146
147                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
148                 rc = 1;
149         } else {
150                 spin_unlock(&imp->imp_lock);
151                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
152                        imp->imp_client->cli_name, imp,
153                        (imp->imp_state == LUSTRE_IMP_FULL &&
154                         imp->imp_conn_cnt > conn_cnt) ?
155                        "reconnected" : "not connected", imp->imp_conn_cnt,
156                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
157         }
158
159         return rc;
160 }
161
162 /*
163  * This acts as a barrier; all existing requests are rejected, and
164  * no new requests will be accepted until the import is valid again.
165  */
166 void ptlrpc_deactivate_import(struct obd_import *imp)
167 {
168         ENTRY;
169
170         spin_lock(&imp->imp_lock);
171         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
172         imp->imp_invalid = 1;
173         imp->imp_generation++;
174         spin_unlock(&imp->imp_lock);
175
176         ptlrpc_abort_inflight(imp);
177         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
178 }
179
180 /*
181  * This function will invalidate the import, if necessary, then block
182  * for all the RPC completions, and finally notify the obd to
183  * invalidate its state (ie cancel locks, clear pending requests,
184  * etc).
185  */
186 void ptlrpc_invalidate_import(struct obd_import *imp)
187 {
188         struct list_head *tmp, *n;
189         struct ptlrpc_request *req;
190         struct l_wait_info lwi;
191         time_t last = 0;
192         int timeout, rc = 0;
193
194         atomic_inc(&imp->imp_inval_count);
195
196         if (!imp->imp_invalid)
197                 ptlrpc_deactivate_import(imp);
198
199         LASSERT(imp->imp_invalid);
200
201         /* wait for all requests to error out and call completion callbacks */
202         spin_lock(&imp->imp_lock);
203         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
204                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
205                 last = max(last, req->rq_deadline);
206         }
207         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
208                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
209                 last = max(last, req->rq_deadline);
210         }
211         spin_unlock(&imp->imp_lock);
212
213         timeout = (int)(last - cfs_time_current_sec());
214         if (timeout > 0) {
215                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(timeout),
216                                            HZ, NULL, NULL);
217                 rc = l_wait_event(imp->imp_recovery_waitq,
218                                   (atomic_read(&imp->imp_inflight) == 0),
219                                   &lwi);
220         }
221
222         if (atomic_read(&imp->imp_inflight)) {
223                 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
224                        obd2cli_tgt(imp->imp_obd), rc,
225                        atomic_read(&imp->imp_inflight));
226                 spin_lock(&imp->imp_lock);
227                 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
228                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
229                         DEBUG_REQ(D_ERROR, req, "still on sending list");
230                 }
231                 list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
232                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
233                         DEBUG_REQ(D_ERROR, req, "still on delayed list");
234                 }
235                 spin_unlock(&imp->imp_lock);
236         }
237
238         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
239
240         atomic_dec(&imp->imp_inval_count);
241         cfs_waitq_signal(&imp->imp_recovery_waitq);
242 }
243
244 /* unset imp_invalid */
245 void ptlrpc_activate_import(struct obd_import *imp)
246 {
247         struct obd_device *obd = imp->imp_obd;
248
249         spin_lock(&imp->imp_lock);
250         imp->imp_invalid = 0;
251         spin_unlock(&imp->imp_lock);
252
253         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
254 }
255
256 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
257 {
258         ENTRY;
259
260         LASSERT(!imp->imp_dlm_fake);
261
262         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
263                 if (!imp->imp_replayable) {
264                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
265                                "auto-deactivating\n",
266                                obd2cli_tgt(imp->imp_obd),
267                                imp->imp_connection->c_remote_uuid.uuid,
268                                imp->imp_obd->obd_name);
269                         ptlrpc_deactivate_import(imp);
270                 }
271
272                 CDEBUG(D_HA, "%s: waking up pinger\n",
273                        obd2cli_tgt(imp->imp_obd));
274
275                 spin_lock(&imp->imp_lock);
276                 imp->imp_force_verify = 1;
277                 spin_unlock(&imp->imp_lock);
278
279                 ptlrpc_pinger_wake_up();
280         }
281         EXIT;
282 }
283
284 static int import_select_connection(struct obd_import *imp)
285 {
286         struct obd_import_conn *imp_conn = NULL, *conn;
287         struct obd_export *dlmexp;
288         int tried_all = 1;
289         ENTRY;
290
291         spin_lock(&imp->imp_lock);
292
293         if (list_empty(&imp->imp_conn_list)) {
294                 CERROR("%s: no connections available\n",
295                         imp->imp_obd->obd_name);
296                 spin_unlock(&imp->imp_lock);
297                 RETURN(-EINVAL);
298         }
299
300         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
301                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
302                        imp->imp_obd->obd_name,
303                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
304                        conn->oic_last_attempt);
305                 
306                 /* Don't thrash connections */
307                 if (cfs_time_before_64(cfs_time_current_64(),
308                                      conn->oic_last_attempt + 
309                                      cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
310                         continue;
311                 }
312
313                 /* If we have not tried this connection since the
314                    the last successful attempt, go with this one */
315                 if ((conn->oic_last_attempt == 0) ||
316                     cfs_time_beforeq_64(conn->oic_last_attempt,
317                                        imp->imp_last_success_conn)) {
318                         imp_conn = conn;
319                         tried_all = 0;
320                         break;
321                 }
322
323                 /* If all of the connections have already been tried
324                    since the last successful connection; just choose the
325                    least recently used */
326                 if (!imp_conn)
327                         imp_conn = conn;
328                 else if (cfs_time_before_64(conn->oic_last_attempt,
329                                             imp_conn->oic_last_attempt))
330                         imp_conn = conn;
331         }
332
333         /* if not found, simply choose the current one */
334         if (!imp_conn) {
335                 LASSERT(imp->imp_conn_current);
336                 imp_conn = imp->imp_conn_current;
337                 tried_all = 0;
338         }
339         LASSERT(imp_conn->oic_conn);
340
341         /* If we've tried everything, and we're back to the beginning of the
342            list, wait for LND_TIMEOUT to give the queues a chance to drain. */
343         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item)) {
344                 int must_wait;
345                 LASSERT(imp_conn->oic_last_attempt);
346                 must_wait = LND_TIMEOUT -
347                         (int)cfs_duration_sec(cfs_time_current_64() - 
348                                               imp_conn->oic_last_attempt);
349                 imp->imp_at.iat_drain = max(0, must_wait);
350                 CWARN("Tried all connections, %lus drain time\n",
351                       imp->imp_at.iat_drain);
352         } else {
353                 imp->imp_at.iat_drain = 0;
354         }
355
356         imp_conn->oic_last_attempt = cfs_time_current_64();
357
358         /* switch connection, don't mind if it's same as the current one */
359         if (imp->imp_connection)
360                 ptlrpc_put_connection(imp->imp_connection);
361         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
362
363         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
364         LASSERT(dlmexp != NULL);
365         if (dlmexp->exp_connection)
366                 ptlrpc_put_connection(dlmexp->exp_connection);
367         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
368         class_export_put(dlmexp);
369
370         if (imp->imp_conn_current != imp_conn) {
371                 if (imp->imp_conn_current)
372                         LCONSOLE_INFO("Changing connection for %s to %s/%s\n",
373                                       imp->imp_obd->obd_name,
374                                       imp_conn->oic_uuid.uuid,
375                                       libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
376                 imp->imp_conn_current = imp_conn;
377         }
378
379         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
380                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
381                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
382
383         spin_unlock(&imp->imp_lock);
384
385         RETURN(0);
386 }
387
388 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
389 {
390         struct obd_device *obd = imp->imp_obd;
391         int initial_connect = 0;
392         int rc;
393         __u64 committed_before_reconnect = 0;
394         struct ptlrpc_request *request;
395         int size[] = { sizeof(struct ptlrpc_body),
396                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
397                        sizeof(obd->obd_uuid),
398                        sizeof(imp->imp_dlm_handle),
399                        sizeof(imp->imp_connect_data) };
400         char *tmp[] = { NULL,
401                         obd2cli_tgt(imp->imp_obd),
402                         obd->obd_uuid.uuid,
403                         (char *)&imp->imp_dlm_handle,
404                         (char *)&imp->imp_connect_data };
405         struct ptlrpc_connect_async_args *aa;
406
407         ENTRY;
408         spin_lock(&imp->imp_lock);
409         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
410                 spin_unlock(&imp->imp_lock);
411                 CERROR("can't connect to a closed import\n");
412                 RETURN(-EINVAL);
413         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
414                 spin_unlock(&imp->imp_lock);
415                 CERROR("already connected\n");
416                 RETURN(0);
417         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
418                 spin_unlock(&imp->imp_lock);
419                 CERROR("already connecting\n");
420                 RETURN(-EALREADY);
421         }
422
423         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
424
425         imp->imp_conn_cnt++;
426         imp->imp_resend_replay = 0;
427
428         if (!lustre_handle_is_used(&imp->imp_remote_handle))
429                 initial_connect = 1;
430         else
431                 committed_before_reconnect = imp->imp_peer_committed_transno;
432
433         spin_unlock(&imp->imp_lock);
434
435         if (new_uuid) {
436                 struct obd_uuid uuid;
437
438                 obd_str2uuid(&uuid, new_uuid);
439                 rc = import_set_conn_priority(imp, &uuid);
440                 if (rc)
441                         GOTO(out, rc);
442         }
443
444         rc = import_select_connection(imp);
445         if (rc)
446                 GOTO(out, rc);
447
448         /* last in connection list */
449         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
450                 if (imp->imp_initial_recov_bk && initial_connect) {
451                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
452                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
453                         /* Don't retry if connect fails */
454                         rc = 0;
455                         obd_set_info_async(obd->obd_self_export,
456                                            strlen(KEY_INIT_RECOV),
457                                            KEY_INIT_RECOV,
458                                            sizeof(rc), &rc, NULL);
459                 }
460                 if (imp->imp_recon_bk) {
461                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
462                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
463                         spin_lock(&imp->imp_lock);
464                         imp->imp_last_recon = 1;
465                         spin_unlock(&imp->imp_lock);
466                 }
467         }
468
469         /* Reset connect flags to the originally requested flags, in case
470          * the server is updated on-the-fly we will get the new features. */
471         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
472         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
473                            &obd->obd_uuid, &imp->imp_connect_data);
474         if (rc)
475                 GOTO(out, rc);
476
477         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
478                                   5, size, tmp);
479         if (!request)
480                 GOTO(out, rc = -ENOMEM);
481
482 #ifndef __KERNEL__
483         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
484 #endif
485         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_NEXT_VER);
486
487         request->rq_send_state = LUSTRE_IMP_CONNECTING;
488         /* Allow a slightly larger reply for future growth compatibility */
489         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
490                               16 * sizeof(__u64);
491         ptlrpc_req_set_repsize(request, 2, size);
492         request->rq_interpret_reply = ptlrpc_connect_interpret;
493
494         CLASSERT(sizeof (*aa) <= sizeof (request->rq_async_args));
495         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
496         memset(aa, 0, sizeof *aa);
497
498         aa->pcaa_peer_committed = committed_before_reconnect;
499         aa->pcaa_initial_connect = initial_connect;
500         if (aa->pcaa_initial_connect) {
501                 spin_lock(&imp->imp_lock);
502                 imp->imp_replayable = 1;
503                 spin_unlock(&imp->imp_lock);
504                 if (AT_OFF)
505                         /* AT will use INITIAL_CONNECT_TIMEOUT the first
506                            time, adaptive after that. */
507                         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
508         }
509
510         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
511                   aa->pcaa_initial_connect ? "initial " : "re", 
512                   imp->imp_conn_cnt);
513         ptlrpcd_add_req(request);
514         rc = 0;
515 out:
516         if (rc != 0) {
517                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
518         }
519
520         RETURN(rc);
521 }
522 EXPORT_SYMBOL(ptlrpc_connect_import);
523
524 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
525 {
526 #ifdef __KERNEL__
527         struct obd_import_conn *imp_conn;
528 #endif
529         int wake_pinger = 0;
530
531         ENTRY;
532
533         spin_lock(&imp->imp_lock);
534         if (list_empty(&imp->imp_conn_list))
535                 GOTO(unlock, 0);
536
537 #ifdef __KERNEL__
538         imp_conn = list_entry(imp->imp_conn_list.prev,
539                               struct obd_import_conn,
540                               oic_item);
541
542         if (imp->imp_conn_current != imp_conn) {
543                 ptlrpc_ping_import_soon(imp);
544                 wake_pinger = 1;
545         }
546
547 #else
548         /* liblustre has no pinger thead, so we wakup pinger anyway */
549         wake_pinger = 1;
550 #endif 
551  unlock:
552         spin_unlock(&imp->imp_lock);
553
554         if (wake_pinger)
555                 ptlrpc_pinger_wake_up();
556
557         EXIT;
558 }
559
560 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
561                                     void * data, int rc)
562 {
563         struct ptlrpc_connect_async_args *aa = data;
564         struct obd_import *imp = request->rq_import;
565         struct client_obd *cli = &imp->imp_obd->u.cli;
566         struct lustre_handle old_hdl;
567         int msg_flags;
568         ENTRY;
569
570         spin_lock(&imp->imp_lock);
571         imp->imp_at.iat_drain = 0;
572         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
573                 spin_unlock(&imp->imp_lock);
574                 RETURN(0);
575         }
576         spin_unlock(&imp->imp_lock);
577
578         if (rc)
579                 GOTO(out, rc);
580
581         LASSERT(imp->imp_conn_current);
582
583         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
584
585         /* All imports are pingable */
586         spin_lock(&imp->imp_lock);
587         imp->imp_pingable = 1;
588
589         if (aa->pcaa_initial_connect) {
590                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
591                         imp->imp_replayable = 1;
592                         spin_unlock(&imp->imp_lock);
593                         CDEBUG(D_HA, "connected to replayable target: %s\n",
594                                obd2cli_tgt(imp->imp_obd));
595                 } else {
596                         imp->imp_replayable = 0;
597                         spin_unlock(&imp->imp_lock);
598                 }
599
600                 if (msg_flags & MSG_CONNECT_NEXT_VER) {
601                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
602                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
603                                obd2cli_tgt(imp->imp_obd));
604                 } else {
605                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
606                                obd2cli_tgt(imp->imp_obd));
607                 }
608
609                 imp->imp_remote_handle =
610                                 *lustre_msg_get_handle(request->rq_repmsg);
611
612                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
613                 GOTO(finish, rc = 0);
614         } else {
615                 spin_unlock(&imp->imp_lock);
616         }
617
618         /* Determine what recovery state to move the import to. */
619         if (MSG_CONNECT_RECONNECT & msg_flags) {
620                 memset(&old_hdl, 0, sizeof(old_hdl));
621                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
622                             sizeof (old_hdl))) {
623                         CERROR("%s@%s didn't like our handle "LPX64
624                                ", failed\n", obd2cli_tgt(imp->imp_obd),
625                                imp->imp_connection->c_remote_uuid.uuid,
626                                imp->imp_dlm_handle.cookie);
627                         GOTO(out, rc = -ENOTCONN);
628                 }
629
630                 if (memcmp(&imp->imp_remote_handle,
631                            lustre_msg_get_handle(request->rq_repmsg),
632                            sizeof(imp->imp_remote_handle))) {
633                         int level = D_ERROR;
634                         /* Old MGC can reconnect to a restarted MGS */
635                         if (strcmp(imp->imp_obd->obd_type->typ_name,
636                                    LUSTRE_MGC_NAME) == 0) {
637                                 level = D_CONFIG;
638                         }
639                         CDEBUG(level, 
640                                "%s@%s changed handle from "LPX64" to "LPX64
641                                "; copying, but this may foreshadow disaster\n",
642                                obd2cli_tgt(imp->imp_obd),
643                                imp->imp_connection->c_remote_uuid.uuid,
644                                imp->imp_remote_handle.cookie,
645                                lustre_msg_get_handle(request->rq_repmsg)->
646                                         cookie);
647                         imp->imp_remote_handle =
648                                      *lustre_msg_get_handle(request->rq_repmsg);
649                 } else {
650                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
651                                obd2cli_tgt(imp->imp_obd),
652                                imp->imp_connection->c_remote_uuid.uuid);
653                 }
654
655                 if (imp->imp_invalid) {
656                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
657                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
658                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
659                                imp->imp_obd->obd_name,
660                                obd2cli_tgt(imp->imp_obd));
661
662                         spin_lock(&imp->imp_lock);
663                         imp->imp_resend_replay = 1;
664                         spin_unlock(&imp->imp_lock);
665
666                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
667                 } else {
668                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
669                 }
670         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
671                 LASSERT(imp->imp_replayable);
672                 imp->imp_remote_handle =
673                                 *lustre_msg_get_handle(request->rq_repmsg);
674                 imp->imp_last_replay_transno = 0;
675                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
676         } else {
677                 DEBUG_REQ(D_HA, request, "evicting, flags=%x", msg_flags);
678                 imp->imp_remote_handle =
679                                 *lustre_msg_get_handle(request->rq_repmsg);
680                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
681         }
682
683         /* Sanity checks for a reconnected import. */
684         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
685                 CERROR("imp_replayable flag does not match server "
686                        "after reconnect. We should LBUG right here.\n");
687         }
688
689         if (lustre_msg_get_last_committed(request->rq_repmsg) <
690             aa->pcaa_peer_committed) {
691                 CERROR("%s went back in time (transno "LPD64
692                        " was previously committed, server now claims "LPD64
693                        ")!  See https://bugzilla.clusterfs.com/"
694                        "long_list.cgi?buglist=9646\n",
695                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
696                        lustre_msg_get_last_committed(request->rq_repmsg));
697         }
698
699 finish:
700         rc = ptlrpc_import_recovery_state_machine(imp);
701         if (rc != 0) {
702                 if (rc == -ENOTCONN) {
703                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
704                                "invalidating and reconnecting\n",
705                                obd2cli_tgt(imp->imp_obd),
706                                imp->imp_connection->c_remote_uuid.uuid);
707                         ptlrpc_connect_import(imp, NULL);
708                         RETURN(0);
709                 }
710         } else {
711                 struct obd_connect_data *ocd;
712                 struct obd_export *exp;
713
714                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
715                                          lustre_swab_connect);
716
717                 spin_lock(&imp->imp_lock);
718                 list_del(&imp->imp_conn_current->oic_item);
719                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
720                 imp->imp_last_success_conn =
721                         imp->imp_conn_current->oic_last_attempt;
722
723                 if (ocd == NULL) {
724                         spin_unlock(&imp->imp_lock);
725                         CERROR("Wrong connect data from server\n");
726                         rc = -EPROTO;
727                         GOTO(out, rc);
728                 }
729
730                 imp->imp_connect_data = *ocd;
731
732                 exp = class_conn2export(&imp->imp_dlm_handle);
733                 spin_unlock(&imp->imp_lock);
734
735                 /* check that server granted subset of flags we asked for. */
736                 LASSERTF((ocd->ocd_connect_flags &
737                           imp->imp_connect_flags_orig) ==
738                          ocd->ocd_connect_flags, LPX64" != "LPX64,
739                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
740
741                 if (!exp) {
742                         /* This could happen if export is cleaned during the 
743                            connect attempt */
744                         CERROR("Missing export for %s\n", 
745                                imp->imp_obd->obd_name);
746                         GOTO(out, rc = -ENODEV);
747                 }
748                 exp->exp_connect_flags = ocd->ocd_connect_flags;
749                 class_export_put(exp);
750
751                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
752
753                 if (!ocd->ocd_ibits_known &&
754                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
755                         CERROR("Inodebits aware server returned zero compatible"
756                                " bits?\n");
757
758                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
759                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
760                                         LUSTRE_VERSION_OFFSET_WARN ||
761                      ocd->ocd_version < LUSTRE_VERSION_CODE -
762                                         LUSTRE_VERSION_OFFSET_WARN)) {
763                         /* Sigh, some compilers do not like #ifdef in the middle
764                            of macro arguments */
765 #ifdef __KERNEL__
766                         const char *older =
767                                 "older.  Consider upgrading this client";
768 #else
769                         const char *older =
770                                 "older.  Consider recompiling this application";
771 #endif
772                         const char *newer = "newer than client version";
773
774                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
775                                       "is much %s (%s)\n",
776                                       obd2cli_tgt(imp->imp_obd),
777                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
778                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
779                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
780                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
781                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
782                                       newer : older, LUSTRE_VERSION_STRING);
783                 }
784
785                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
786                         cli->cl_max_pages_per_rpc = 
787                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
788                 }
789
790                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
791                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
792                         /* We need a per-message support flag, because 
793                            a. we don't know if the incoming connect reply
794                               supports AT or not (in reply_in_callback)
795                               until we unpack it.
796                            b. failovered server means export and flags are gone
797                               (in ptlrpc_send_reply).
798                            Can only be set when we know AT is supported at 
799                            both ends */
800                         imp->imp_msg_flags |= MSG_AT_SUPPORT;
801                 else
802                         imp->imp_msg_flags &= ~MSG_AT_SUPPORT;
803
804                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
805                         (cli->cl_max_pages_per_rpc > 0));
806         }
807
808  out:
809         if (rc != 0) {
810                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
811                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov)
812                         ptlrpc_deactivate_import(imp);
813
814                 if (imp->imp_recon_bk && imp->imp_last_recon) {
815                         /* Give up trying to reconnect */
816                         imp->imp_obd->obd_no_recov = 1;
817                         ptlrpc_deactivate_import(imp);
818                 }
819
820                 if (rc == -EPROTO) {
821                         struct obd_connect_data *ocd;
822                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
823                                                  sizeof *ocd,
824                                                  lustre_swab_connect);
825                         if (ocd &&
826                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
827                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
828                            /* Actually servers are only supposed to refuse
829                               connection from liblustre clients, so we should
830                               never see this from VFS context */
831                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
832                                         "(%d.%d.%d.%d)"
833                                         " refused connection from this client "
834                                         "with an incompatible version (%s).  "
835                                         "Client must be recompiled\n",
836                                         obd2cli_tgt(imp->imp_obd),
837                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
838                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
839                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
840                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
841                                         LUSTRE_VERSION_STRING);
842                                 ptlrpc_deactivate_import(imp);
843                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
844                         }
845                         RETURN(-EPROTO);
846                 }
847
848                 ptlrpc_maybe_ping_import_soon(imp);
849
850                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
851                        obd2cli_tgt(imp->imp_obd),
852                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
853         }
854         
855         spin_lock(&imp->imp_lock);
856         imp->imp_last_recon = 0;
857         spin_unlock(&imp->imp_lock);
858
859         cfs_waitq_signal(&imp->imp_recovery_waitq);
860         RETURN(rc);
861 }
862
863 static int completed_replay_interpret(struct ptlrpc_request *req,
864                                     void * data, int rc)
865 {
866         ENTRY;
867         atomic_dec(&req->rq_import->imp_replay_inflight);
868         if (req->rq_status == 0) {
869                 ptlrpc_import_recovery_state_machine(req->rq_import);
870         } else {
871                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
872                        "reconnecting\n",
873                        req->rq_import->imp_obd->obd_name, req->rq_status);
874                 ptlrpc_connect_import(req->rq_import, NULL);
875         }
876
877         RETURN(0);
878 }
879
880 static int signal_completed_replay(struct obd_import *imp)
881 {
882         struct ptlrpc_request *req;
883         ENTRY;
884
885         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
886         atomic_inc(&imp->imp_replay_inflight);
887
888         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
889         if (!req) {
890                 atomic_dec(&imp->imp_replay_inflight);
891                 RETURN(-ENOMEM);
892         }
893
894         ptlrpc_req_set_repsize(req, 1, NULL);
895         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
896         lustre_msg_add_flags(req->rq_reqmsg, MSG_LAST_REPLAY);
897         req->rq_timeout *= 3;
898         req->rq_interpret_reply = completed_replay_interpret;
899
900         ptlrpcd_add_req(req);
901         RETURN(0);
902 }
903
904 #ifdef __KERNEL__
905 static int ptlrpc_invalidate_import_thread(void *data)
906 {
907         struct obd_import *imp = data;
908
909         ENTRY;
910
911         ptlrpc_daemonize("ll_imp_inval");
912         
913         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
914                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
915                imp->imp_connection->c_remote_uuid.uuid);
916
917         ptlrpc_invalidate_import(imp);
918
919         if (obd_dump_on_eviction) {
920                 CERROR("dump the log upon eviction\n");
921                 libcfs_debug_dumplog();
922         }
923
924         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
925         ptlrpc_import_recovery_state_machine(imp);
926
927         RETURN(0);
928 }
929 #endif
930
931 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
932 {
933         int rc = 0;
934         int inflight;
935         char *target_start;
936         int target_len;
937
938         ENTRY;
939         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
940                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
941                           &target_start, &target_len);
942                 /* Don't care about MGC eviction */
943                 if (strcmp(imp->imp_obd->obd_type->typ_name,
944                            LUSTRE_MGC_NAME) != 0) {
945                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
946                                            "%.*s; in progress operations using "
947                                            "this service will fail.\n",
948                                            target_len, target_start);
949                 }
950                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
951                        obd2cli_tgt(imp->imp_obd),
952                        imp->imp_connection->c_remote_uuid.uuid);
953
954 #ifdef __KERNEL__
955                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
956                                    CLONE_VM | CLONE_FILES);
957                 if (rc < 0)
958                         CERROR("error starting invalidate thread: %d\n", rc);
959                 else
960                         rc = 0;
961                 RETURN(rc);
962 #else
963                 ptlrpc_invalidate_import(imp);
964
965                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
966 #endif
967         }
968
969         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
970                 CDEBUG(D_HA, "replay requested by %s\n",
971                        obd2cli_tgt(imp->imp_obd));
972                 rc = ptlrpc_replay_next(imp, &inflight);
973                 if (inflight == 0 &&
974                     atomic_read(&imp->imp_replay_inflight) == 0) {
975                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
976                         rc = ldlm_replay_locks(imp);
977                         if (rc)
978                                 GOTO(out, rc);
979                 }
980                 rc = 0;
981         }
982
983         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
984                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
985                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
986                         rc = signal_completed_replay(imp);
987                         if (rc)
988                                 GOTO(out, rc);
989                 }
990
991         }
992
993         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
994                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
995                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
996                 }
997         }
998
999         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1000                 CDEBUG(D_HA, "reconnected to %s@%s\n",
1001                        obd2cli_tgt(imp->imp_obd),
1002                        imp->imp_connection->c_remote_uuid.uuid);
1003
1004                 rc = ptlrpc_resend(imp);
1005                 if (rc)
1006                         GOTO(out, rc);
1007                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1008                 ptlrpc_activate_import(imp);
1009
1010                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1011                           &target_start, &target_len);
1012                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1013                               "using nid %s.\n", imp->imp_obd->obd_name,
1014                               target_len, target_start,
1015                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1016         }
1017
1018         if (imp->imp_state == LUSTRE_IMP_FULL) {
1019                 cfs_waitq_signal(&imp->imp_recovery_waitq);
1020                 ptlrpc_wake_delayed(imp);
1021         }
1022
1023  out:
1024         RETURN(rc);
1025 }
1026
1027 static int back_to_sleep(void *unused)
1028 {
1029         return 0;
1030 }
1031
1032 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1033 {
1034         struct ptlrpc_request *req;
1035         int rq_opc, rc = 0;
1036         ENTRY;
1037
1038         switch (imp->imp_connect_op) {
1039         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1040         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1041         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1042         default:
1043                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1044                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1045                 RETURN(-EINVAL);
1046         }
1047
1048         if (ptlrpc_import_in_recovery(imp)) {
1049                 struct l_wait_info lwi;
1050                 cfs_duration_t timeout;
1051                 int idx;
1052
1053                 if (AT_OFF || (idx = import_at_get_index(imp, 
1054                                       imp->imp_client->cli_request_portal)) < 0)
1055                         timeout = cfs_time_seconds(obd_timeout);
1056                 else
1057                         timeout = cfs_time_seconds(
1058                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1059                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout), 
1060                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1061                 rc = l_wait_event(imp->imp_recovery_waitq,
1062                                   !ptlrpc_import_in_recovery(imp), &lwi);
1063         }
1064
1065         spin_lock(&imp->imp_lock);
1066         if (imp->imp_state != LUSTRE_IMP_FULL)
1067                 GOTO(out, 0);
1068
1069         spin_unlock(&imp->imp_lock);
1070
1071         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1072         if (req) {
1073                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1074                  * it fails.  We can get through the above with a down server
1075                  * if the client doesn't know the server is gone yet. */
1076                 req->rq_no_resend = 1;
1077                 
1078 #ifndef CRAY_XT3
1079                 /* We want client umounts to happen quickly, no matter the 
1080                    server state... */
1081                 req->rq_timeout = min(req->rq_timeout, INITIAL_CONNECT_TIMEOUT);
1082 #else
1083                 /* ... but we always want liblustre clients to nicely 
1084                    disconnect, so only use the adaptive value. */
1085                 if (AT_OFF)
1086                         req->rq_timeout = obd_timeout / 3;
1087 #endif
1088
1089                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1090                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1091                 ptlrpc_req_set_repsize(req, 1, NULL);
1092                 rc = ptlrpc_queue_wait(req);
1093                 ptlrpc_req_finished(req);
1094         }
1095
1096         spin_lock(&imp->imp_lock);
1097 out:
1098         if (noclose) 
1099                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1100         else
1101                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1102         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1103         /* Try all connections in the future - bz 12758 */ 
1104         imp->imp_last_recon = 0;
1105         spin_unlock(&imp->imp_lock);
1106
1107         RETURN(rc);
1108 }
1109
1110 /* Sets maximal number of RPCs possible originating from other side of this
1111    import (server) to us and number of async RPC replies that we are not waiting
1112    for arriving */
1113 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1114 {
1115         LNetSetAsync(imp->imp_connection->c_peer, count);
1116 }
1117
1118
1119 /* Adaptive Timeout utils */
1120
1121 /* Bin into timeslices using AT_BINS bins.
1122    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1123    but still smoothing out a return to normalcy from a slow response.
1124    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1125 void at_add(struct adaptive_timeout *at, unsigned int val) {
1126         /*unsigned int old = at->at_current;*/
1127         time_t now = cfs_time_current_sec();
1128
1129         LASSERT(at);
1130 #if 0
1131         CDEBUG(D_INFO, "add %u to %p time=%lu tb=%lu v=%u (%u %u %u %u)\n", 
1132                val, at, now - at->at_binstart, at->at_binlimit, at->at_current,
1133                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1134 #endif
1135         if (val == 0) 
1136                 /* 0's don't count, because we never want our timeout to 
1137                    drop to 0, and because 0 could mean an error */
1138                 return;
1139
1140         spin_lock(&at->at_lock);
1141
1142         if (unlikely(at->at_binstart == 0)) {
1143                 /* Special case to remove default from history */
1144                 at->at_current = val;
1145                 at->at_worst_ever = val;
1146                 at->at_worst_time = now;
1147                 at->at_hist[0] = val;
1148                 at->at_binstart = now;
1149         } else if (now - at->at_binstart < at->at_binlimit ) {
1150                 /* in bin 0 */
1151                 at->at_hist[0] = max(val, at->at_hist[0]);
1152                 at->at_current = max(val, at->at_current);
1153         } else {
1154                 int i, shift;
1155                 unsigned int maxv = val;
1156                 /* move bins over */
1157                 shift = (now - at->at_binstart) / at->at_binlimit;
1158                 LASSERT(shift > 0);
1159                 for(i = AT_BINS - 1; i >= 0; i--) {
1160                         if (i >= shift) {
1161                                 at->at_hist[i] = at->at_hist[i - shift];
1162                                 maxv = max(maxv, at->at_hist[i]);
1163                         } else {
1164                                 at->at_hist[i] = 0;
1165                         }
1166                 }
1167                 at->at_hist[0] = val;
1168                 at->at_current = maxv;
1169                 at->at_binstart += shift * at->at_binlimit;
1170         }
1171
1172         if ((at->at_flags & AT_FLG_MIN) && 
1173             (at->at_current < adaptive_timeout_min))
1174                 at->at_current = adaptive_timeout_min;
1175
1176         if (at->at_current > at->at_worst_ever) {
1177                 at->at_worst_ever = at->at_current;
1178                 at->at_worst_time = now;
1179         }
1180
1181         if (at->at_flags & AT_FLG_NOHIST)
1182                 /* Only keep last reported val; keeping the rest of the history
1183                    for proc only */
1184                 at->at_current = val;
1185
1186 #if 0
1187         if (at->at_current != old)
1188                 CDEBUG(D_ADAPTTO, "AT change: old=%u new=%u delta=%d (val=%u) "
1189                        "hist %u %u %u %u\n",
1190                        old, at->at_current, at->at_current - old, val,
1191                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1192                        at->at_hist[3]);
1193 #endif
1194         spin_unlock(&at->at_lock);
1195 }
1196
1197 /* Find the imp_at index for a given portal; assign if space available */
1198 int import_at_get_index(struct obd_import *imp, int portal) {
1199         struct imp_at *at = &imp->imp_at;
1200         int i;
1201
1202         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1203                 if (at->iat_portal[i] == portal) 
1204                         return i;
1205                 if (at->iat_portal[i] == 0)
1206                         /* unused */
1207                         break;
1208         }
1209
1210         /* Not found in list, add it under a lock */
1211         spin_lock(&imp->imp_lock);
1212
1213         /* Check unused under lock */
1214         for (; i < IMP_AT_MAX_PORTALS; i++) {
1215                 if (at->iat_portal[i] == portal) 
1216                         goto out;
1217                 if (at->iat_portal[i] == 0)
1218                         /* unused */
1219                         break;
1220         }
1221
1222         if (i >= IMP_AT_MAX_PORTALS) {
1223                 CERROR("Tried to use more than %d portals, not enough room "
1224                        "in adaptive timeout stats.\n", IMP_AT_MAX_PORTALS);
1225                 i = -1;
1226                 goto out;
1227         }
1228         at->iat_portal[i] = portal;
1229
1230 out:
1231         spin_unlock(&imp->imp_lock);
1232         return i;
1233 }
1234
1235 /* Get total expected lock callback time (net + service).
1236    Since any early reply will only affect the RPC wait time, and not
1237    any local lock timer we set based on the return value here,
1238    we should be conservative. */
1239 int import_at_get_ldlm(struct obd_import *imp) {
1240         int idx, tot;
1241         
1242         if (!imp || !imp->imp_client || AT_OFF)
1243                 return obd_timeout;
1244         
1245         tot = at_get(&imp->imp_at.iat_net_latency);
1246         idx = import_at_get_index(imp, imp->imp_client->cli_request_portal);
1247         if (idx < 0)
1248                 tot += obd_timeout;
1249         else
1250                 tot += at_get(&imp->imp_at.iat_service_estimate[idx]);
1251
1252         /* add an arbitrary minimum: 150% + 10 sec */
1253         tot += (tot >> 1) + 10;
1254         return tot;
1255 }
1256