Whamcloud - gitweb
LU-14930 mdt: abort_recov_mdt shouldn't abort client recovery
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
index bd2c02a..64214e2 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 /**
@@ -141,6 +140,52 @@ int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
 }
 EXPORT_SYMBOL(client_import_add_conn);
 
+int client_import_dyn_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
+                              lnet_nid_t prim_nid, int priority)
+{
+       struct ptlrpc_connection *ptlrpc_conn;
+       int rc;
+
+       ptlrpc_conn = ptlrpc_uuid_to_connection(uuid, prim_nid);
+       if (!ptlrpc_conn) {
+               const char *str_uuid = obd_uuid2str(uuid);
+
+               rc = class_add_uuid(str_uuid, prim_nid);
+               if (rc) {
+                       CERROR("%s: failed to add UUID '%s': rc = %d\n",
+                              imp->imp_obd->obd_name, str_uuid, rc);
+                       return rc;
+               }
+       }
+       return import_set_conn(imp, uuid, priority, 1);
+}
+EXPORT_SYMBOL(client_import_dyn_add_conn);
+
+int client_import_add_nids_to_conn(struct obd_import *imp, lnet_nid_t *nids,
+                                  int nid_count, struct obd_uuid *uuid)
+{
+       struct obd_import_conn *conn;
+       int rc = -ENOENT;
+
+       ENTRY;
+       if (nid_count <= 0 || !nids)
+               return rc;
+
+       spin_lock(&imp->imp_lock);
+       list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
+               if (class_check_uuid(&conn->oic_uuid, nids[0])) {
+                       *uuid = conn->oic_uuid;
+                       spin_unlock(&imp->imp_lock);
+                       rc = class_add_nids_to_uuid(&conn->oic_uuid, nids,
+                                                   nid_count);
+                       RETURN(rc);
+               }
+       }
+       spin_unlock(&imp->imp_lock);
+       RETURN(rc);
+}
+EXPORT_SYMBOL(client_import_add_nids_to_conn);
+
 int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid)
 {
        struct obd_import_conn *imp_conn;
@@ -496,7 +541,7 @@ int client_obd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
        if (lustre_cfg_buf(lcfg, 4)) {
                __u32 refnet = libcfs_str2net(lustre_cfg_string(lcfg, 4));
 
-               if (refnet == LNET_NIDNET(LNET_NID_ANY)) {
+               if (refnet == LNET_NET_ANY) {
                        rc = -EINVAL;
                        CERROR("%s: bad mount option 'network=%s': rc = %d\n",
                               obd->obd_name, lustre_cfg_string(lcfg, 4),
@@ -533,10 +578,12 @@ int client_obd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                                                LDLM_NAMESPACE_CLIENT,
                                                LDLM_NAMESPACE_GREEDY,
                                                ns_type);
-       if (obd->obd_namespace == NULL) {
-               CERROR("Unable to create client namespace - %s\n",
-                      obd->obd_name);
-               GOTO(err_import, rc = -ENOMEM);
+       if (IS_ERR(obd->obd_namespace)) {
+               rc = PTR_ERR(obd->obd_namespace);
+               CERROR("%s: unable to create client namespace: rc = %d\n",
+                      obd->obd_name, rc);
+               obd->obd_namespace = NULL;
+               GOTO(err_import, rc);
        }
 
        RETURN(rc);
@@ -550,8 +597,8 @@ err:
                OBD_FREE(cli->cl_mod_tag_bitmap,
                         BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
        cli->cl_mod_tag_bitmap = NULL;
-       RETURN(rc);
 
+       RETURN(rc);
 }
 EXPORT_SYMBOL(client_obd_setup);
 
@@ -795,7 +842,7 @@ static inline int target_check_recovery_timer(struct obd_device *target)
        if (!target->obd_recovering || target->obd_recovery_start == 0)
                return 0;
 
-       remaining = hrtimer_expires_remaining(&target->obd_recovery_timer);
+       remaining = hrtimer_get_remaining(&target->obd_recovery_timer);
        timeout = ktime_divns(remaining, NSEC_PER_SEC);
        if (timeout > -30)
                return 0;
@@ -863,7 +910,7 @@ static int target_handle_reconnect(struct lustre_handle *conn,
                GOTO(out_already, rc);
        }
 
-       remaining = hrtimer_expires_remaining(&target->obd_recovery_timer);
+       remaining = hrtimer_get_remaining(&target->obd_recovery_timer);
        timeout = ktime_divns(remaining, NSEC_PER_SEC);
        if (timeout > 0) {
                LCONSOLE_WARN("%s: Client %s (at %s) reconnected, waiting for %d clients in recovery for %lld:%.02lld\n",
@@ -936,6 +983,7 @@ static int rev_import_flags_update(struct obd_import *revimp,
 
        revimp->imp_msghdr_flags |= MSGHDR_CKSUM_INCOMPAT18;
 
+       revimp->imp_connect_data = *data;
        rc = sptlrpc_import_sec_adapt(revimp, req->rq_svc_ctx, &req->rq_flvr);
        if (rc) {
                CERROR("%s: cannot get reverse import %s security: rc = %d\n",
@@ -997,8 +1045,7 @@ static int rev_import_reconnect(struct obd_export *exp,
        /* avoid sending a request until import flags are changed */
        ptlrpc_import_enter_resend(revimp);
 
-       if (revimp->imp_connection != NULL)
-               ptlrpc_connection_put(revimp->imp_connection);
+       ptlrpc_connection_put(revimp->imp_connection);
 
        /*
         * client from recovery don't have a handle so we need to take from
@@ -1051,6 +1098,7 @@ int target_handle_connect(struct ptlrpc_request *req)
        struct obd_connect_data *data, *tmpdata;
        int size, tmpsize;
        lnet_nid_t *client_nid = NULL;
+       struct ptlrpc_connection *pcon = NULL;
 
        ENTRY;
 
@@ -1156,13 +1204,12 @@ int target_handle_connect(struct ptlrpc_request *req)
         * processing, so we needs to allow lw_client to be connected at
         * anytime, instead of only the initial connection
         */
-       lw_client = (data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0;
+       lw_client = OCD_HAS_FLAG(data, LIGHTWEIGHT);
 
        if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_INITIAL) {
                initial_conn = true;
-               mds_conn = (data->ocd_connect_flags & OBD_CONNECT_MDS) != 0;
-               mds_mds_conn = (data->ocd_connect_flags &
-                               OBD_CONNECT_MDS_MDS) != 0;
+               mds_conn = OCD_HAS_FLAG(data, MDS);
+               mds_mds_conn = OCD_HAS_FLAG(data, MDS_MDS);
 
                /*
                 * OBD_CONNECT_MNE_SWAB is removed at 2.14
@@ -1220,26 +1267,27 @@ int target_handle_connect(struct ptlrpc_request *req)
                export = NULL;
                rc = -EALREADY;
        } else if ((mds_conn || (lw_client && initial_conn) ||
-                  data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
-                  export->exp_connection != NULL) {
+                  OCD_HAS_FLAG(data, MDS_MDS)) && export->exp_connection) {
                spin_unlock(&export->exp_lock);
                if (req->rq_peer.nid != export->exp_connection->c_peer.nid) {
                        /* MDS or LWP reconnected after failover. */
                        LCONSOLE_WARN("%s: Received %s connection from %s, removing former export from %s\n",
                                      target->obd_name,
-                                     mds_conn ? "MDS" : "LWP",
+                                     lw_client ? "LWP" : "MDS",
                                      libcfs_nid2str(req->rq_peer.nid),
                                      libcfs_nid2str(export->exp_connection->c_peer.nid));
                } else {
-                       /* New MDS connection from the same NID. */
-                       LCONSOLE_WARN("%s: Received new %s connection from %s, removing former export from same NID\n",
+                       /* New connection from the same NID. */
+                       LCONSOLE_WARN("%s: Received new %s connection from %s, %s former export from same NID\n",
                                      target->obd_name,
-                                     mds_conn ? "MDS" : "LWP",
-                                     libcfs_nid2str(req->rq_peer.nid));
+                                     lw_client ? "LWP" : "MDS",
+                                     libcfs_nid2str(req->rq_peer.nid),
+                                     OCD_HAS_FLAG(data, MDS_MDS) ?
+                                     "keep" : "remove");
                }
 
                if (req->rq_peer.nid == export->exp_connection->c_peer.nid &&
-                   data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) {
+                   OCD_HAS_FLAG(data, MDS_MDS)) {
                        /*
                         * Because exports between MDTs will always be
                         * kept, let's do not fail such export if they
@@ -1297,7 +1345,7 @@ no_export:
        } else if (lustre_msg_get_conn_cnt(req->rq_reqmsg) == 1 &&
                   rc != EALREADY) {
                if (!strstr(cluuid.uuid, "mdt"))
-                       LCONSOLE_WARN("%s: Rejecting reconnect from the known client %s (at %s) because it is indicating it is a new client",
+                       LCONSOLE_WARN("%s: Rejecting reconnect from the known client %s (at %s) because it is indicating it is a new client\n",
                                      target->obd_name, cluuid.uuid,
                                      libcfs_nid2str(req->rq_peer.nid));
                GOTO(out, rc = -EALREADY);
@@ -1358,7 +1406,7 @@ no_export:
                        known =
                           atomic_read(&target->obd_max_recoverable_clients);
                        stale = target->obd_stale_clients;
-                       remaining = hrtimer_expires_remaining(timer);
+                       remaining = hrtimer_get_remaining(timer);
                        left = ktime_divns(remaining, NSEC_PER_SEC);
 
                        if (ktime_to_ns(remaining) > 0) {
@@ -1430,7 +1478,16 @@ dont_check_exports:
         */
        ptlrpc_request_change_export(req, export);
 
+       pcon = ptlrpc_connection_get(req->rq_peer, req->rq_self, &cluuid);
+       if (pcon == NULL)
+               GOTO(out, rc = -ENOTCONN);
+
        spin_lock(&export->exp_lock);
+
+       if (export->exp_disconnected) {
+               spin_unlock(&export->exp_lock);
+               GOTO(out, rc = -ENODEV);
+       }
        if (export->exp_conn_cnt >= lustre_msg_get_conn_cnt(req->rq_reqmsg)) {
                spin_unlock(&export->exp_lock);
                CDEBUG(D_RPCTRACE,
@@ -1443,24 +1500,23 @@ dont_check_exports:
        }
        LASSERT(lustre_msg_get_conn_cnt(req->rq_reqmsg) > 0);
        export->exp_conn_cnt = lustre_msg_get_conn_cnt(req->rq_reqmsg);
-       spin_unlock(&export->exp_lock);
-
-       if (export->exp_connection != NULL) {
-               /* Check to see if connection came from another NID. */
-               if (export->exp_connection->c_peer.nid != req->rq_peer.nid)
-                       obd_nid_del(export->exp_obd, export);
 
+       /* Check to see if connection came from another NID. */
+       if (export->exp_connection != NULL &&
+           export->exp_connection->c_peer.nid != req->rq_peer.nid) {
+               obd_nid_del(export->exp_obd, export);
                ptlrpc_connection_put(export->exp_connection);
+               export->exp_connection = NULL;
        }
 
-       export->exp_connection = ptlrpc_connection_get(req->rq_peer,
-                                                      req->rq_self,
-                                                      &cluuid);
-       if (!export->exp_connection)
-               GOTO(out, rc = -ENOTCONN);
-
+       if (export->exp_connection == NULL) {
+               export->exp_connection = pcon;
+               pcon = NULL;
+       }
        obd_nid_add(export->exp_obd, export);
 
+       spin_unlock(&export->exp_lock);
+
        lustre_msg_set_handle(req->rq_repmsg, &conn);
 
        rc = rev_import_reconnect(export, req);
@@ -1536,6 +1592,8 @@ out:
                spin_unlock(&target->obd_dev_lock);
                class_decref(target, "find", current);
        }
+       if (pcon)
+               ptlrpc_connection_put(pcon);
        req->rq_status = rc;
        RETURN(rc);
 }
@@ -1550,6 +1608,18 @@ int target_handle_disconnect(struct ptlrpc_request *req)
        if (rc)
                RETURN(rc);
 
+       /* In case of target disconnect, updating sec ctx immediately is
+        * required in order to record latest sequence number used.
+        * Sequence is normally updated on export destroy, but this event
+        * can occur too late, ie after a new target connect request has
+        * been processed.
+        * Maintaining correct sequence when client connection becomes idle
+        * ensures that GSS does not erroneously consider requests as replays.
+        */
+       rc = sptlrpc_export_update_ctx(req->rq_export);
+       if (rc)
+               RETURN(rc);
+
        /* Keep the rq_export around so we can send the reply. */
        req->rq_status = obd_disconnect(class_export_get(req->rq_export));
 
@@ -1679,6 +1749,8 @@ static void target_finish_recovery(struct lu_target *lut)
                              atomic_read(&obd->obd_connected_clients),
                              obd->obd_stale_clients,
                              obd->obd_stale_clients == 1 ? "was" : "were");
+               if (obd->obd_stale_clients && do_dump_on_eviction(obd))
+                       libcfs_debug_dumplog();
        }
 
        ldlm_reprocess_recovery_done(obd->obd_namespace);
@@ -1771,6 +1843,7 @@ void target_cleanup_recovery(struct obd_device *obd)
                return;
        }
        obd->obd_recovering = obd->obd_abort_recovery = 0;
+       obd->obd_abort_recov_mdt = 0;
        spin_unlock(&obd->obd_dev_lock);
 
        spin_lock(&obd->obd_recovery_task_lock);
@@ -1864,7 +1937,7 @@ static void extend_recovery_timer(struct obd_device *obd, timeout_t dr_timeout,
        }
        LASSERT(obd->obd_recovery_start != 0);
 
-       left_ns = hrtimer_expires_remaining(&obd->obd_recovery_timer);
+       left_ns = hrtimer_get_remaining(&obd->obd_recovery_timer);
        left = ktime_divns(left_ns, NSEC_PER_SEC);
 
        if (extend) {
@@ -2017,7 +2090,7 @@ static int check_for_next_transno(struct lu_target *lut)
                req_transno = lustre_msg_get_transno(req->rq_reqmsg);
        }
 
-       if (tdtd != NULL)
+       if (!obd->obd_abort_recov_mdt && tdtd)
                update_transno = distribute_txn_get_next_transno(tdtd);
 
        connected = atomic_read(&obd->obd_connected_clients);
@@ -2037,7 +2110,7 @@ static int check_for_next_transno(struct lu_target *lut)
        } else if (obd->obd_recovery_expired) {
                CDEBUG(D_HA, "waking for expired recovery\n");
                wake_up = 1;
-       } else if (tdtd != NULL && req != NULL &&
+       } else if (!obd->obd_abort_recov_mdt && tdtd && req &&
                   is_req_replayed_by_update(req)) {
                LASSERTF(req_transno < next_transno,
                         "req_transno %llu next_transno%llu\n", req_transno,
@@ -2149,7 +2222,7 @@ repeat:
                 * left in the queue
                 */
                spin_lock(&obd->obd_recovery_task_lock);
-               if (lut->lut_tdtd != NULL) {
+               if (!obd->obd_abort_recov_mdt && lut->lut_tdtd) {
                        next_update_transno =
                                distribute_txn_get_next_transno(lut->lut_tdtd);
 
@@ -2222,8 +2295,7 @@ repeat:
                /** evict exports which didn't finish recovery yet */
                class_disconnect_stale_exports(obd, exp_finished);
                return 1;
-       } else if (obd->obd_recovery_expired &&
-                  obd->obd_recovery_timeout < obd->obd_recovery_time_hard) {
+       } else if (obd->obd_recovery_expired) {
                obd->obd_recovery_expired = 0;
 
                /** If some clients died being recovered, evict them */
@@ -2377,7 +2449,7 @@ static int check_for_recovery_ready(struct lu_target *lut)
                        return 0;
        }
 
-       if (lut->lut_tdtd != NULL) {
+       if (!obd->obd_abort_recov_mdt && lut->lut_tdtd != NULL) {
                if (!lut->lut_tdtd->tdtd_replay_ready &&
                    !obd->obd_abort_recovery && !obd->obd_stopping) {
                        /*
@@ -2429,7 +2501,7 @@ static __u64 get_next_transno(struct lu_target *lut, int *type)
        if (type != NULL)
                *type = REQUEST_RECOVERY;
 
-       if (tdtd == NULL)
+       if (!tdtd || obd->obd_abort_recov_mdt)
                RETURN(transno);
 
        update_transno = distribute_txn_get_next_transno(tdtd);
@@ -2641,8 +2713,6 @@ static int target_recovery_thread(void *arg)
        int rc = 0;
 
        ENTRY;
-
-       unshare_fs_struct();
        OBD_ALLOC_PTR(thread);
        if (thread == NULL)
                RETURN(-ENOMEM);
@@ -2660,6 +2730,7 @@ static int target_recovery_thread(void *arg)
 
        thread->t_env = env;
        thread->t_id = -1; /* force filter_iobuf_get/put to use local buffers */
+       thread->t_task = current;
        env->le_ctx.lc_thread = thread;
        tgt_io_thread_init(thread); /* init thread_big_cache for IO requests */
 
@@ -2697,6 +2768,11 @@ static int target_recovery_thread(void *arg)
                LASSERT(trd->trd_processing_task == current->pid);
                DEBUG_REQ(D_HA, req, "processing lock from %s:",
                          libcfs_nid2str(req->rq_peer.nid));
+               if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_LOCK_REPLAY)) {
+                       req->rq_status = -ENODEV;
+                       target_request_copy_put(req);
+                       continue;
+               }
                handle_recovery_req(thread, req,
                                    trd->trd_recovery_handler);
                target_request_copy_put(req);
@@ -2717,6 +2793,7 @@ static int target_recovery_thread(void *arg)
         */
        spin_lock(&obd->obd_dev_lock);
        obd->obd_recovering = obd->obd_abort_recovery = 0;
+       obd->obd_abort_recov_mdt = 0;
        spin_unlock(&obd->obd_dev_lock);
        spin_lock(&obd->obd_recovery_task_lock);
        target_cancel_recovery_timer(obd);