Whamcloud - gitweb
ee7d3a5c9d8d22de1a16b44580058e7dc60ee5ae
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  */
25
26 #define DEBUG_SUBSYSTEM S_RPC
27 #ifndef __KERNEL__
28 # include <liblustre.h>
29 #endif
30
31 #include <obd_support.h>
32 #include <lustre_ha.h>
33 #include <lustre_net.h>
34 #include <lustre_import.h>
35 #include <lustre_export.h>
36 #include <obd.h>
37 #include <obd_class.h>
38
39 #include "ptlrpc_internal.h"
40
41 struct ptlrpc_connect_async_args {
42          __u64 pcaa_peer_committed;
43         int pcaa_initial_connect;
44 };
45
46 /* A CLOSED import should remain so. */
47 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
48 do {                                                                           \
49         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
50                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
51                       imp, obd2cli_tgt(imp->imp_obd),                          \
52                       ptlrpc_import_state_name(imp->imp_state),                \
53                       ptlrpc_import_state_name(state));                        \
54                imp->imp_state = state;                                         \
55         }                                                                      \
56 } while(0)
57
58 #define IMPORT_SET_STATE(imp, state)            \
59 do {                                            \
60         spin_lock(&imp->imp_lock);              \
61         IMPORT_SET_STATE_NOLOCK(imp, state);    \
62         spin_unlock(&imp->imp_lock);            \
63 } while(0)
64
65
66 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
67                                     void * data, int rc);
68 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
69
70 /* Only this function is allowed to change the import state when it is
71  * CLOSED. I would rather refcount the import and free it after
72  * disconnection like we do with exports. To do that, the client_obd
73  * will need to save the peer info somewhere other than in the import,
74  * though. */
75 int ptlrpc_init_import(struct obd_import *imp)
76 {
77         spin_lock(&imp->imp_lock);
78
79         imp->imp_generation++;
80         imp->imp_state =  LUSTRE_IMP_NEW;
81
82         spin_unlock(&imp->imp_lock);
83
84         return 0;
85 }
86 EXPORT_SYMBOL(ptlrpc_init_import);
87
88 #define UUID_STR "_UUID"
89 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
90                       int *uuid_len)
91 {
92         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
93                 ? uuid : uuid + strlen(prefix);
94
95         *uuid_len = strlen(*uuid_start);
96
97         if (*uuid_len < strlen(UUID_STR))
98                 return;
99
100         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
101                     UUID_STR, strlen(UUID_STR)))
102                 *uuid_len -= strlen(UUID_STR);
103 }
104
105 /* Returns true if import was FULL, false if import was already not
106  * connected.
107  * @imp - import to be disconnected
108  * @conn_cnt - connection count (epoch) of the request that timed out
109  *             and caused the disconnection.  In some cases, multiple
110  *             inflight requests can fail to a single target (e.g. OST
111  *             bulk requests) and if one has already caused a reconnection
112  *             (increasing the import->conn_cnt) the older failure should
113  *             not also cause a reconnection.  If zero it forces a reconnect.
114  */
115 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
116 {
117         int rc = 0;
118
119         spin_lock(&imp->imp_lock);
120
121         if (imp->imp_state == LUSTRE_IMP_FULL &&
122             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
123                 char *target_start;
124                 int   target_len;
125
126                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
127                           &target_start, &target_len);
128                 if (imp->imp_replayable) {
129                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
130                                "%s was lost; in progress operations using this "
131                                "service will wait for recovery to complete.\n",
132                                imp->imp_obd->obd_name, target_len, target_start,
133                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
134                 } else {
135                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
136                                "%.*s via nid %s was lost; in progress "
137                                "operations using this service will fail.\n",
138                                imp->imp_obd->obd_name, target_len, target_start, 
139                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
140                 }
141                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
142                 spin_unlock(&imp->imp_lock);
143     
144                 if (obd_dump_on_timeout)
145                         libcfs_debug_dumplog();
146
147                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
148                 rc = 1;
149         } else {
150                 spin_unlock(&imp->imp_lock);
151                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
152                        imp->imp_client->cli_name, imp,
153                        (imp->imp_state == LUSTRE_IMP_FULL &&
154                         imp->imp_conn_cnt > conn_cnt) ?
155                        "reconnected" : "not connected", imp->imp_conn_cnt,
156                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
157         }
158
159         return rc;
160 }
161
162 /*
163  * This acts as a barrier; all existing requests are rejected, and
164  * no new requests will be accepted until the import is valid again.
165  */
166 void ptlrpc_deactivate_import(struct obd_import *imp)
167 {
168         ENTRY;
169
170         spin_lock(&imp->imp_lock);
171         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
172         imp->imp_invalid = 1;
173         imp->imp_generation++;
174         spin_unlock(&imp->imp_lock);
175
176         ptlrpc_abort_inflight(imp);
177         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
178 }
179
180 /*
181  * This function will invalidate the import, if necessary, then block
182  * for all the RPC completions, and finally notify the obd to
183  * invalidate its state (ie cancel locks, clear pending requests,
184  * etc).
185  */
186 void ptlrpc_invalidate_import(struct obd_import *imp)
187 {
188         struct list_head *tmp, *n;
189         struct ptlrpc_request *req;
190         struct l_wait_info lwi;
191         int rc;
192
193         atomic_inc(&imp->imp_inval_count);
194
195         if (!imp->imp_invalid)
196                 ptlrpc_deactivate_import(imp);
197
198         LASSERT(imp->imp_invalid);
199
200         /* wait for all requests to error out and call completion callbacks.
201            Cap it at obd_timeout -- these should all have been locally
202            cancelled by ptlrpc_abort_inflight. */
203         lwi = LWI_TIMEOUT_INTERVAL(
204                 cfs_timeout_cap(cfs_time_seconds(obd_timeout)),
205                 cfs_time_seconds(1), NULL, NULL);
206         rc = l_wait_event(imp->imp_recovery_waitq,
207                           (atomic_read(&imp->imp_inflight) == 0), &lwi);
208
209         if (rc) {
210                 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
211                        obd2cli_tgt(imp->imp_obd), rc,
212                        atomic_read(&imp->imp_inflight));
213                 spin_lock(&imp->imp_lock);
214                 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
215                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
216                         DEBUG_REQ(D_ERROR, req, "still on sending list");
217                 }
218                 list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
219                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
220                         DEBUG_REQ(D_ERROR, req, "still on delayed list");
221                 }
222                 spin_unlock(&imp->imp_lock);
223         }
224
225         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
226
227         atomic_dec(&imp->imp_inval_count);
228         cfs_waitq_signal(&imp->imp_recovery_waitq);
229 }
230
231 /* unset imp_invalid */
232 void ptlrpc_activate_import(struct obd_import *imp)
233 {
234         struct obd_device *obd = imp->imp_obd;
235
236         spin_lock(&imp->imp_lock);
237         imp->imp_invalid = 0;
238         spin_unlock(&imp->imp_lock);
239
240         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
241 }
242
243 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
244 {
245         ENTRY;
246
247         LASSERT(!imp->imp_dlm_fake);
248
249         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
250                 if (!imp->imp_replayable) {
251                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
252                                "auto-deactivating\n",
253                                obd2cli_tgt(imp->imp_obd),
254                                imp->imp_connection->c_remote_uuid.uuid,
255                                imp->imp_obd->obd_name);
256                         ptlrpc_deactivate_import(imp);
257                 }
258
259                 CDEBUG(D_HA, "%s: waking up pinger\n",
260                        obd2cli_tgt(imp->imp_obd));
261
262                 spin_lock(&imp->imp_lock);
263                 imp->imp_force_verify = 1;
264                 spin_unlock(&imp->imp_lock);
265
266                 ptlrpc_pinger_wake_up();
267         }
268         EXIT;
269 }
270
271 static int import_select_connection(struct obd_import *imp)
272 {
273         struct obd_import_conn *imp_conn = NULL, *conn;
274         struct obd_export *dlmexp;
275         int tried_all = 1;
276         ENTRY;
277
278         spin_lock(&imp->imp_lock);
279
280         if (list_empty(&imp->imp_conn_list)) {
281                 CERROR("%s: no connections available\n",
282                         imp->imp_obd->obd_name);
283                 spin_unlock(&imp->imp_lock);
284                 RETURN(-EINVAL);
285         }
286
287         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
288                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
289                        imp->imp_obd->obd_name,
290                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
291                        conn->oic_last_attempt);
292                 
293                 /* Don't thrash connections */
294                 if (cfs_time_before_64(cfs_time_current_64(),
295                                      conn->oic_last_attempt + 
296                                      cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
297                         continue;
298                 }
299
300                 /* If we have not tried this connection since the
301                    the last successful attempt, go with this one */
302                 if ((conn->oic_last_attempt == 0) ||
303                     cfs_time_beforeq_64(conn->oic_last_attempt,
304                                        imp->imp_last_success_conn)) {
305                         imp_conn = conn;
306                         tried_all = 0;
307                         break;
308                 }
309
310                 /* If all of the connections have already been tried
311                    since the last successful connection; just choose the
312                    least recently used */
313                 if (!imp_conn)
314                         imp_conn = conn;
315                 else if (cfs_time_before_64(conn->oic_last_attempt,
316                                             imp_conn->oic_last_attempt))
317                         imp_conn = conn;
318         }
319
320         /* if not found, simply choose the current one */
321         if (!imp_conn) {
322                 LASSERT(imp->imp_conn_current);
323                 imp_conn = imp->imp_conn_current;
324                 tried_all = 0;
325         }
326         LASSERT(imp_conn->oic_conn);
327
328         /* If we've tried everything, and we're back to the beginning of the
329            list, increase our timeout and try again. It will be reset when
330            we do finally connect. (FIXME: really we should wait for all network
331            state associated with the last connection attempt to drain before
332            trying to reconnect on it.) */
333         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
334             !imp->imp_recon_bk /* not retrying */) {
335                 if (at_get(&imp->imp_at.iat_net_latency) <
336                     CONNECTION_SWITCH_MAX) {
337                         at_add(&imp->imp_at.iat_net_latency,
338                                at_get(&imp->imp_at.iat_net_latency) +
339                                CONNECTION_SWITCH_INC);
340                 }
341                 LASSERT(imp_conn->oic_last_attempt);
342                 CWARN("%s: tried all connections, increasing latency to %ds\n",
343                       imp->imp_obd->obd_name,
344                       at_get(&imp->imp_at.iat_net_latency));
345         }
346
347         imp_conn->oic_last_attempt = cfs_time_current_64();
348
349         /* switch connection, don't mind if it's same as the current one */
350         if (imp->imp_connection)
351                 ptlrpc_put_connection(imp->imp_connection);
352         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
353
354         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
355         LASSERT(dlmexp != NULL);
356         if (dlmexp->exp_connection)
357                 ptlrpc_put_connection(dlmexp->exp_connection);
358         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
359         class_export_put(dlmexp);
360
361         if (imp->imp_conn_current != imp_conn) {
362                 if (imp->imp_conn_current)
363                         LCONSOLE_INFO("Changing connection for %s to %s/%s\n",
364                                       imp->imp_obd->obd_name,
365                                       imp_conn->oic_uuid.uuid,
366                                       libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
367                 imp->imp_conn_current = imp_conn;
368         }
369
370         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
371                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
372                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
373
374         spin_unlock(&imp->imp_lock);
375
376         RETURN(0);
377 }
378
379 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
380 {
381         struct obd_device *obd = imp->imp_obd;
382         int initial_connect = 0;
383         int rc;
384         __u64 committed_before_reconnect = 0;
385         struct ptlrpc_request *request;
386         int size[] = { sizeof(struct ptlrpc_body),
387                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
388                        sizeof(obd->obd_uuid),
389                        sizeof(imp->imp_dlm_handle),
390                        sizeof(imp->imp_connect_data) };
391         char *tmp[] = { NULL,
392                         obd2cli_tgt(imp->imp_obd),
393                         obd->obd_uuid.uuid,
394                         (char *)&imp->imp_dlm_handle,
395                         (char *)&imp->imp_connect_data };
396         struct ptlrpc_connect_async_args *aa;
397
398         ENTRY;
399         spin_lock(&imp->imp_lock);
400         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
401                 spin_unlock(&imp->imp_lock);
402                 CERROR("can't connect to a closed import\n");
403                 RETURN(-EINVAL);
404         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
405                 spin_unlock(&imp->imp_lock);
406                 CERROR("already connected\n");
407                 RETURN(0);
408         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
409                 spin_unlock(&imp->imp_lock);
410                 CERROR("already connecting\n");
411                 RETURN(-EALREADY);
412         }
413
414         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
415
416         imp->imp_conn_cnt++;
417         imp->imp_resend_replay = 0;
418
419         if (!lustre_handle_is_used(&imp->imp_remote_handle))
420                 initial_connect = 1;
421         else
422                 committed_before_reconnect = imp->imp_peer_committed_transno;
423
424         spin_unlock(&imp->imp_lock);
425
426         if (new_uuid) {
427                 struct obd_uuid uuid;
428
429                 obd_str2uuid(&uuid, new_uuid);
430                 rc = import_set_conn_priority(imp, &uuid);
431                 if (rc)
432                         GOTO(out, rc);
433         }
434
435         rc = import_select_connection(imp);
436         if (rc)
437                 GOTO(out, rc);
438
439         /* last in connection list */
440         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
441                 if (imp->imp_initial_recov_bk && initial_connect) {
442                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
443                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
444                         /* Don't retry if connect fails */
445                         rc = 0;
446                         obd_set_info_async(obd->obd_self_export,
447                                            strlen(KEY_INIT_RECOV),
448                                            KEY_INIT_RECOV,
449                                            sizeof(rc), &rc, NULL);
450                 }
451                 if (imp->imp_recon_bk) {
452                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
453                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
454                         spin_lock(&imp->imp_lock);
455                         imp->imp_last_recon = 1;
456                         spin_unlock(&imp->imp_lock);
457                 }
458         }
459
460         /* Reset connect flags to the originally requested flags, in case
461          * the server is updated on-the-fly we will get the new features. */
462         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
463         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
464                            &obd->obd_uuid, &imp->imp_connect_data);
465         if (rc)
466                 GOTO(out, rc);
467
468         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
469                                   5, size, tmp);
470         if (!request)
471                 GOTO(out, rc = -ENOMEM);
472
473 #ifndef __KERNEL__
474         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
475 #endif
476         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_NEXT_VER);
477
478         request->rq_send_state = LUSTRE_IMP_CONNECTING;
479         /* Allow a slightly larger reply for future growth compatibility */
480         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
481                               16 * sizeof(__u64);
482         ptlrpc_req_set_repsize(request, 2, size);
483         request->rq_interpret_reply = ptlrpc_connect_interpret;
484
485         CLASSERT(sizeof (*aa) <= sizeof (request->rq_async_args));
486         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
487         memset(aa, 0, sizeof *aa);
488
489         aa->pcaa_peer_committed = committed_before_reconnect;
490         aa->pcaa_initial_connect = initial_connect;
491         if (aa->pcaa_initial_connect) {
492                 spin_lock(&imp->imp_lock);
493                 imp->imp_replayable = 1;
494                 spin_unlock(&imp->imp_lock);
495                 if (AT_OFF)
496                         /* AT will use INITIAL_CONNECT_TIMEOUT the first
497                            time, adaptive after that. */
498                         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
499         }
500
501         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
502                   aa->pcaa_initial_connect ? "initial " : "re", 
503                   imp->imp_conn_cnt);
504         ptlrpcd_add_req(request);
505         rc = 0;
506 out:
507         if (rc != 0) {
508                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
509         }
510
511         RETURN(rc);
512 }
513 EXPORT_SYMBOL(ptlrpc_connect_import);
514
515 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
516 {
517 #ifdef __KERNEL__
518         struct obd_import_conn *imp_conn;
519 #endif
520         int wake_pinger = 0;
521
522         ENTRY;
523
524         spin_lock(&imp->imp_lock);
525         if (list_empty(&imp->imp_conn_list))
526                 GOTO(unlock, 0);
527
528 #ifdef __KERNEL__
529         imp_conn = list_entry(imp->imp_conn_list.prev,
530                               struct obd_import_conn,
531                               oic_item);
532
533         if (imp->imp_conn_current != imp_conn) {
534                 ptlrpc_ping_import_soon(imp);
535                 wake_pinger = 1;
536         }
537
538 #else
539         /* liblustre has no pinger thead, so we wakup pinger anyway */
540         wake_pinger = 1;
541 #endif 
542  unlock:
543         spin_unlock(&imp->imp_lock);
544
545         if (wake_pinger)
546                 ptlrpc_pinger_wake_up();
547
548         EXIT;
549 }
550
551 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
552                                     void * data, int rc)
553 {
554         struct ptlrpc_connect_async_args *aa = data;
555         struct obd_import *imp = request->rq_import;
556         struct client_obd *cli = &imp->imp_obd->u.cli;
557         struct lustre_handle old_hdl;
558         int msg_flags;
559         ENTRY;
560
561         spin_lock(&imp->imp_lock);
562         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
563                 spin_unlock(&imp->imp_lock);
564                 RETURN(0);
565         }
566         spin_unlock(&imp->imp_lock);
567
568         if (rc)
569                 GOTO(out, rc);
570
571         LASSERT(imp->imp_conn_current);
572
573         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
574
575         /* All imports are pingable */
576         spin_lock(&imp->imp_lock);
577         imp->imp_pingable = 1;
578
579         if (aa->pcaa_initial_connect) {
580                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
581                         imp->imp_replayable = 1;
582                         spin_unlock(&imp->imp_lock);
583                         CDEBUG(D_HA, "connected to replayable target: %s\n",
584                                obd2cli_tgt(imp->imp_obd));
585                 } else {
586                         imp->imp_replayable = 0;
587                         spin_unlock(&imp->imp_lock);
588                 }
589
590                 if (msg_flags & MSG_CONNECT_NEXT_VER) {
591                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
592                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
593                                obd2cli_tgt(imp->imp_obd));
594                 } else {
595                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
596                                obd2cli_tgt(imp->imp_obd));
597                 }
598
599                 imp->imp_remote_handle =
600                                 *lustre_msg_get_handle(request->rq_repmsg);
601
602                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
603                 ptlrpc_activate_import(imp);
604                 GOTO(finish, rc = 0);
605         } else {
606                 spin_unlock(&imp->imp_lock);
607         }
608
609         /* Determine what recovery state to move the import to. */
610         if (MSG_CONNECT_RECONNECT & msg_flags) {
611                 memset(&old_hdl, 0, sizeof(old_hdl));
612                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
613                             sizeof (old_hdl))) {
614                         CERROR("%s@%s didn't like our handle "LPX64
615                                ", failed\n", obd2cli_tgt(imp->imp_obd),
616                                imp->imp_connection->c_remote_uuid.uuid,
617                                imp->imp_dlm_handle.cookie);
618                         GOTO(out, rc = -ENOTCONN);
619                 }
620
621                 if (memcmp(&imp->imp_remote_handle,
622                            lustre_msg_get_handle(request->rq_repmsg),
623                            sizeof(imp->imp_remote_handle))) {
624                         int level = D_ERROR;
625                         /* Old MGC can reconnect to a restarted MGS */
626                         if (strcmp(imp->imp_obd->obd_type->typ_name,
627                                    LUSTRE_MGC_NAME) == 0) {
628                                 level = D_CONFIG;
629                         }
630                         CDEBUG(level, 
631                                "%s@%s changed handle from "LPX64" to "LPX64
632                                "; copying, but this may foreshadow disaster\n",
633                                obd2cli_tgt(imp->imp_obd),
634                                imp->imp_connection->c_remote_uuid.uuid,
635                                imp->imp_remote_handle.cookie,
636                                lustre_msg_get_handle(request->rq_repmsg)->
637                                         cookie);
638                         imp->imp_remote_handle =
639                                      *lustre_msg_get_handle(request->rq_repmsg);
640                 } else {
641                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
642                                obd2cli_tgt(imp->imp_obd),
643                                imp->imp_connection->c_remote_uuid.uuid);
644                 }
645
646                 if (imp->imp_invalid) {
647                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
648                                "marking evicted\n", imp->imp_obd->obd_name);
649                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
650                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
651                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
652                                imp->imp_obd->obd_name,
653                                obd2cli_tgt(imp->imp_obd));
654
655                         spin_lock(&imp->imp_lock);
656                         imp->imp_resend_replay = 1;
657                         spin_unlock(&imp->imp_lock);
658
659                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
660                 } else {
661                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
662                 }
663         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
664                 LASSERT(imp->imp_replayable);
665                 imp->imp_remote_handle =
666                                 *lustre_msg_get_handle(request->rq_repmsg);
667                 imp->imp_last_replay_transno = 0;
668                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
669         } else {
670                 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
671                           "flags reconnect/recovering not set: %x)",msg_flags);
672                 imp->imp_remote_handle =
673                                 *lustre_msg_get_handle(request->rq_repmsg);
674                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
675         }
676
677         /* Sanity checks for a reconnected import. */
678         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
679                 CERROR("imp_replayable flag does not match server "
680                        "after reconnect. We should LBUG right here.\n");
681         }
682
683         if (lustre_msg_get_last_committed(request->rq_repmsg) <
684             aa->pcaa_peer_committed) {
685                 CERROR("%s went back in time (transno "LPD64
686                        " was previously committed, server now claims "LPD64
687                        ")!  See https://bugzilla.clusterfs.com/"
688                        "long_list.cgi?buglist=9646\n",
689                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
690                        lustre_msg_get_last_committed(request->rq_repmsg));
691         }
692
693 finish:
694         rc = ptlrpc_import_recovery_state_machine(imp);
695         if (rc != 0) {
696                 if (rc == -ENOTCONN) {
697                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
698                                "invalidating and reconnecting\n",
699                                obd2cli_tgt(imp->imp_obd),
700                                imp->imp_connection->c_remote_uuid.uuid);
701                         ptlrpc_connect_import(imp, NULL);
702                         RETURN(0);
703                 }
704         } else {
705                 struct obd_connect_data *ocd;
706                 struct obd_export *exp;
707
708                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
709                                          lustre_swab_connect);
710                 spin_lock(&imp->imp_lock);
711                 list_del(&imp->imp_conn_current->oic_item);
712                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
713                 imp->imp_last_success_conn =
714                         imp->imp_conn_current->oic_last_attempt;
715
716                 if (ocd == NULL) {
717                         spin_unlock(&imp->imp_lock);
718                         CERROR("Wrong connect data from server\n");
719                         rc = -EPROTO;
720                         GOTO(out, rc);
721                 }
722
723                 imp->imp_connect_data = *ocd;
724
725                 exp = class_conn2export(&imp->imp_dlm_handle);
726                 spin_unlock(&imp->imp_lock);
727
728                 /* check that server granted subset of flags we asked for. */
729                 LASSERTF((ocd->ocd_connect_flags &
730                           imp->imp_connect_flags_orig) ==
731                          ocd->ocd_connect_flags, LPX64" != "LPX64,
732                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
733
734                 if (!exp) {
735                         /* This could happen if export is cleaned during the 
736                            connect attempt */
737                         CERROR("Missing export for %s\n", 
738                                imp->imp_obd->obd_name);
739                         GOTO(out, rc = -ENODEV);
740                 }
741                 exp->exp_connect_flags = ocd->ocd_connect_flags;
742                 imp->imp_obd->obd_self_export->exp_connect_flags = ocd->ocd_connect_flags;
743                 class_export_put(exp);
744
745                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
746
747                 if (!ocd->ocd_ibits_known &&
748                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
749                         CERROR("Inodebits aware server returned zero compatible"
750                                " bits?\n");
751
752                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
753                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
754                                         LUSTRE_VERSION_OFFSET_WARN ||
755                      ocd->ocd_version < LUSTRE_VERSION_CODE -
756                                         LUSTRE_VERSION_OFFSET_WARN)) {
757                         /* Sigh, some compilers do not like #ifdef in the middle
758                            of macro arguments */
759 #ifdef __KERNEL__
760                         const char *older =
761                                 "older.  Consider upgrading this client";
762 #else
763                         const char *older =
764                                 "older.  Consider recompiling this application";
765 #endif
766                         const char *newer = "newer than client version";
767
768                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
769                                       "is much %s (%s)\n",
770                                       obd2cli_tgt(imp->imp_obd),
771                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
772                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
773                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
774                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
775                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
776                                       newer : older, LUSTRE_VERSION_STRING);
777                 }
778
779                 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
780                         /* We sent to the server ocd_cksum_types with bits set
781                          * for algorithms we understand. The server masked off
782                          * the checksum types it doesn't support */
783                         if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
784                                 LCONSOLE_WARN("The negotiation of the checksum "
785                                               "alogrithm to use with server %s "
786                                               "failed (%x/%x), disabling "
787                                               "checksums\n",
788                                               obd2cli_tgt(imp->imp_obd),
789                                               ocd->ocd_cksum_types,
790                                               OBD_CKSUM_ALL);
791                                 cli->cl_checksum = 0;
792                                 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
793                                 cli->cl_cksum_type = OBD_CKSUM_CRC32;
794                         } else {
795                                 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
796
797                                 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
798                                         cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
799                                 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
800                                         cli->cl_cksum_type = OBD_CKSUM_ADLER;
801                                 else
802                                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
803                         }
804                 } else {
805                         /* The server does not support OBD_CONNECT_CKSUM.
806                          * Enforce CRC32 for backward compatibility*/
807                         cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
808                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
809                 }
810
811                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
812                         cli->cl_max_pages_per_rpc = 
813                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
814                 }
815
816                 imp->imp_obd->obd_namespace->ns_connect_flags = 
817                         ocd->ocd_connect_flags;
818                 imp->imp_obd->obd_namespace->ns_orig_connect_flags = 
819                         ocd->ocd_connect_flags;
820
821                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
822                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
823                         /* We need a per-message support flag, because 
824                            a. we don't know if the incoming connect reply
825                               supports AT or not (in reply_in_callback)
826                               until we unpack it.
827                            b. failovered server means export and flags are gone
828                               (in ptlrpc_send_reply).
829                            Can only be set when we know AT is supported at 
830                            both ends */
831                         imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
832                 else
833                         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
834
835                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
836                         (cli->cl_max_pages_per_rpc > 0));
837         }
838
839  out:
840         if (rc != 0) {
841                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
842                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
843                     (request->rq_import_generation == imp->imp_generation))
844                         ptlrpc_deactivate_import(imp);
845
846                 if (imp->imp_recon_bk && imp->imp_last_recon) {
847                         /* Give up trying to reconnect */
848                         imp->imp_obd->obd_no_recov = 1;
849                         ptlrpc_deactivate_import(imp);
850                 }
851
852                 if (rc == -EPROTO) {
853                         struct obd_connect_data *ocd;
854                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
855                                                  sizeof *ocd,
856                                                  lustre_swab_connect);
857                         if (ocd &&
858                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
859                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
860                            /* Actually servers are only supposed to refuse
861                               connection from liblustre clients, so we should
862                               never see this from VFS context */
863                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
864                                         "(%d.%d.%d.%d)"
865                                         " refused connection from this client "
866                                         "with an incompatible version (%s).  "
867                                         "Client must be recompiled\n",
868                                         obd2cli_tgt(imp->imp_obd),
869                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
870                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
871                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
872                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
873                                         LUSTRE_VERSION_STRING);
874                                 ptlrpc_deactivate_import(imp);
875                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
876                         }
877                         RETURN(-EPROTO);
878                 }
879
880                 ptlrpc_maybe_ping_import_soon(imp);
881
882                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
883                        obd2cli_tgt(imp->imp_obd),
884                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
885         }
886         
887         spin_lock(&imp->imp_lock);
888         imp->imp_last_recon = 0;
889         spin_unlock(&imp->imp_lock);
890
891         cfs_waitq_signal(&imp->imp_recovery_waitq);
892         RETURN(rc);
893 }
894
895 static int completed_replay_interpret(struct ptlrpc_request *req,
896                                     void * data, int rc)
897 {
898         ENTRY;
899         atomic_dec(&req->rq_import->imp_replay_inflight);
900         if (req->rq_status == 0) {
901                 ptlrpc_import_recovery_state_machine(req->rq_import);
902         } else {
903                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
904                        "reconnecting\n",
905                        req->rq_import->imp_obd->obd_name, req->rq_status);
906                 ptlrpc_connect_import(req->rq_import, NULL);
907         }
908
909         RETURN(0);
910 }
911
912 static int signal_completed_replay(struct obd_import *imp)
913 {
914         struct ptlrpc_request *req;
915         ENTRY;
916
917         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
918         atomic_inc(&imp->imp_replay_inflight);
919
920         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
921         if (!req) {
922                 atomic_dec(&imp->imp_replay_inflight);
923                 RETURN(-ENOMEM);
924         }
925
926         ptlrpc_req_set_repsize(req, 1, NULL);
927         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
928         lustre_msg_add_flags(req->rq_reqmsg, MSG_LAST_REPLAY);
929         req->rq_timeout *= 3;
930         req->rq_interpret_reply = completed_replay_interpret;
931
932         ptlrpcd_add_req(req);
933         RETURN(0);
934 }
935
936 #ifdef __KERNEL__
937 static int ptlrpc_invalidate_import_thread(void *data)
938 {
939         struct obd_import *imp = data;
940
941         ENTRY;
942
943         ptlrpc_daemonize("ll_imp_inval");
944         
945         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
946                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
947                imp->imp_connection->c_remote_uuid.uuid);
948
949         ptlrpc_invalidate_import(imp);
950
951         if (obd_dump_on_eviction) {
952                 CERROR("dump the log upon eviction\n");
953                 libcfs_debug_dumplog();
954         }
955
956         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
957         ptlrpc_import_recovery_state_machine(imp);
958
959         RETURN(0);
960 }
961 #endif
962
963 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
964 {
965         int rc = 0;
966         int inflight;
967         char *target_start;
968         int target_len;
969
970         ENTRY;
971         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
972                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
973                           &target_start, &target_len);
974                 /* Don't care about MGC eviction */
975                 if (strcmp(imp->imp_obd->obd_type->typ_name,
976                            LUSTRE_MGC_NAME) != 0) {
977                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
978                                            "%.*s; in progress operations using "
979                                            "this service will fail.\n",
980                                            target_len, target_start);
981                 }
982                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
983                        obd2cli_tgt(imp->imp_obd),
984                        imp->imp_connection->c_remote_uuid.uuid);
985
986 #ifdef __KERNEL__
987                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
988                                    CLONE_VM | CLONE_FILES);
989                 if (rc < 0)
990                         CERROR("error starting invalidate thread: %d\n", rc);
991                 else
992                         rc = 0;
993                 RETURN(rc);
994 #else
995                 ptlrpc_invalidate_import(imp);
996
997                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
998 #endif
999         }
1000
1001         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1002                 CDEBUG(D_HA, "replay requested by %s\n",
1003                        obd2cli_tgt(imp->imp_obd));
1004                 rc = ptlrpc_replay_next(imp, &inflight);
1005                 if (inflight == 0 &&
1006                     atomic_read(&imp->imp_replay_inflight) == 0) {
1007                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1008                         rc = ldlm_replay_locks(imp);
1009                         if (rc)
1010                                 GOTO(out, rc);
1011                 }
1012                 rc = 0;
1013         }
1014
1015         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1016                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1017                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1018                         rc = signal_completed_replay(imp);
1019                         if (rc)
1020                                 GOTO(out, rc);
1021                 }
1022
1023         }
1024
1025         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1026                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1027                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1028                 }
1029         }
1030
1031         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1032                 CDEBUG(D_HA, "reconnected to %s@%s\n",
1033                        obd2cli_tgt(imp->imp_obd),
1034                        imp->imp_connection->c_remote_uuid.uuid);
1035
1036                 rc = ptlrpc_resend(imp);
1037                 if (rc)
1038                         GOTO(out, rc);
1039                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1040                 ptlrpc_activate_import(imp);
1041
1042                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1043                           &target_start, &target_len);
1044                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1045                               "using nid %s.\n", imp->imp_obd->obd_name,
1046                               target_len, target_start,
1047                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1048         }
1049
1050         if (imp->imp_state == LUSTRE_IMP_FULL) {
1051                 cfs_waitq_signal(&imp->imp_recovery_waitq);
1052                 ptlrpc_wake_delayed(imp);
1053         }
1054
1055  out:
1056         RETURN(rc);
1057 }
1058
1059 static int back_to_sleep(void *unused)
1060 {
1061         return 0;
1062 }
1063
1064 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1065 {
1066         struct ptlrpc_request *req;
1067         int rq_opc, rc = 0;
1068         int nowait = imp->imp_obd->obd_force;
1069         ENTRY;
1070
1071         if (nowait)
1072                 GOTO(set_state, rc);
1073
1074         switch (imp->imp_connect_op) {
1075         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1076         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1077         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1078         default:
1079                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1080                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1081                 RETURN(-EINVAL);
1082         }
1083
1084         if (ptlrpc_import_in_recovery(imp)) {
1085                 struct l_wait_info lwi;
1086                 cfs_duration_t timeout;
1087
1088                 if (AT_OFF) {
1089                         timeout = cfs_time_seconds(obd_timeout);
1090                 } else {
1091                         int idx = import_at_get_index(imp, 
1092                                 imp->imp_client->cli_request_portal);
1093                         timeout = cfs_time_seconds(
1094                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1095                 }
1096                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout), 
1097                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1098                 rc = l_wait_event(imp->imp_recovery_waitq,
1099                                   !ptlrpc_import_in_recovery(imp), &lwi);
1100         }
1101
1102         spin_lock(&imp->imp_lock);
1103         if (imp->imp_state != LUSTRE_IMP_FULL)
1104                 GOTO(out, 0);
1105
1106         spin_unlock(&imp->imp_lock);
1107
1108         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1109         if (req) {
1110                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1111                  * it fails.  We can get through the above with a down server
1112                  * if the client doesn't know the server is gone yet. */
1113                 req->rq_no_resend = 1;
1114                 
1115 #ifndef CRAY_XT3
1116                 /* We want client umounts to happen quickly, no matter the 
1117                    server state... */
1118                 req->rq_timeout = min_t(int, req->rq_timeout,
1119                                         INITIAL_CONNECT_TIMEOUT);
1120 #else
1121                 /* ... but we always want liblustre clients to nicely 
1122                    disconnect, so only use the adaptive value. */
1123                 if (AT_OFF)
1124                         req->rq_timeout = obd_timeout / 3;
1125 #endif
1126
1127                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1128                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1129                 ptlrpc_req_set_repsize(req, 1, NULL);
1130                 rc = ptlrpc_queue_wait(req);
1131                 ptlrpc_req_finished(req);
1132         }
1133
1134 set_state:
1135         spin_lock(&imp->imp_lock);
1136 out:
1137         if (noclose) 
1138                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1139         else
1140                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1141         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1142         /* Try all connections in the future - bz 12758 */ 
1143         imp->imp_last_recon = 0;
1144         spin_unlock(&imp->imp_lock);
1145
1146         RETURN(rc);
1147 }
1148
1149 /* Sets maximal number of RPCs possible originating from other side of this
1150    import (server) to us and number of async RPC replies that we are not waiting
1151    for arriving */
1152 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1153 {
1154         LNetSetAsync(imp->imp_connection->c_peer, count);
1155 }
1156
1157
1158 /* Adaptive Timeout utils */
1159 extern unsigned int at_min, at_max, at_history;
1160
1161 /* Bin into timeslices using AT_BINS bins.
1162    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1163    but still smoothing out a return to normalcy from a slow response.
1164    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1165 int at_add(struct adaptive_timeout *at, unsigned int val) 
1166 {
1167         unsigned int old = at->at_current;
1168         time_t now = cfs_time_current_sec();
1169         time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1170
1171         LASSERT(at);
1172 #if 0
1173         CDEBUG(D_INFO, "add %u to %p time=%lu v=%u (%u %u %u %u)\n", 
1174                val, at, now - at->at_binstart, at->at_current,
1175                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1176 #endif
1177         if (val == 0) 
1178                 /* 0's don't count, because we never want our timeout to 
1179                    drop to 0, and because 0 could mean an error */
1180                 return 0;
1181
1182         spin_lock(&at->at_lock);
1183
1184         if (unlikely(at->at_binstart == 0)) {
1185                 /* Special case to remove default from history */
1186                 at->at_current = val;
1187                 at->at_worst_ever = val;
1188                 at->at_worst_time = now;
1189                 at->at_hist[0] = val;
1190                 at->at_binstart = now;
1191         } else if (now - at->at_binstart < binlimit ) {
1192                 /* in bin 0 */
1193                 at->at_hist[0] = max(val, at->at_hist[0]);
1194                 at->at_current = max(val, at->at_current);
1195         } else {
1196                 int i, shift;
1197                 unsigned int maxv = val;
1198                 /* move bins over */
1199                 shift = (now - at->at_binstart) / binlimit;
1200                 LASSERT(shift > 0);
1201                 for(i = AT_BINS - 1; i >= 0; i--) {
1202                         if (i >= shift) {
1203                                 at->at_hist[i] = at->at_hist[i - shift];
1204                                 maxv = max(maxv, at->at_hist[i]);
1205                         } else {
1206                                 at->at_hist[i] = 0;
1207                         }
1208                 }
1209                 at->at_hist[0] = val;
1210                 at->at_current = maxv;
1211                 at->at_binstart += shift * binlimit;
1212         }
1213
1214         if (at->at_current > at->at_worst_ever) {
1215                 at->at_worst_ever = at->at_current;
1216                 at->at_worst_time = now;
1217         }
1218
1219         if (at->at_flags & AT_FLG_NOHIST)
1220                 /* Only keep last reported val; keeping the rest of the history
1221                    for proc only */
1222                 at->at_current = val;
1223
1224         if (at_max > 0)
1225                 at->at_current =  min(at->at_current, at_max);
1226         at->at_current =  max(at->at_current, at_min);
1227
1228 #if 0
1229         if (at->at_current != old)
1230                 CDEBUG(D_ADAPTTO, "AT %p change: old=%u new=%u delta=%d "
1231                        "(val=%u) hist %u %u %u %u\n", at,
1232                        old, at->at_current, at->at_current - old, val,
1233                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1234                        at->at_hist[3]);
1235 #endif
1236         
1237         /* if we changed, report the old value */
1238         old = (at->at_current != old) ? old : 0;
1239         
1240         spin_unlock(&at->at_lock);
1241         return old;
1242 }
1243
1244 /* Find the imp_at index for a given portal; assign if space available */
1245 int import_at_get_index(struct obd_import *imp, int portal) 
1246 {
1247         struct imp_at *at = &imp->imp_at;
1248         int i;
1249
1250         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1251                 if (at->iat_portal[i] == portal) 
1252                         return i;
1253                 if (at->iat_portal[i] == 0)
1254                         /* unused */
1255                         break;
1256         }
1257
1258         /* Not found in list, add it under a lock */
1259         spin_lock(&imp->imp_lock);
1260
1261         /* Check unused under lock */
1262         for (; i < IMP_AT_MAX_PORTALS; i++) {
1263                 if (at->iat_portal[i] == portal) 
1264                         goto out;
1265                 if (at->iat_portal[i] == 0)
1266                         /* unused */
1267                         break;
1268         }
1269         
1270         /* Not enough portals? */
1271         LASSERT(i < IMP_AT_MAX_PORTALS);
1272
1273         at->iat_portal[i] = portal;
1274 out:
1275         spin_unlock(&imp->imp_lock);
1276         return i;
1277 }
1278