Whamcloud - gitweb
LU-6086 obdclass: check peer's version for MDT-MDT connection
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
index 089f4cb..5e1f170 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -353,7 +353,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
        INIT_LIST_HEAD(&cli->cl_loi_hp_ready_list);
        INIT_LIST_HEAD(&cli->cl_loi_write_list);
        INIT_LIST_HEAD(&cli->cl_loi_read_list);
-       client_obd_list_lock_init(&cli->cl_loi_list_lock);
+       spin_lock_init(&cli->cl_loi_list_lock);
        atomic_set(&cli->cl_pending_w_pages, 0);
        atomic_set(&cli->cl_pending_r_pages, 0);
        cli->cl_r_in_flight = 0;
@@ -372,7 +372,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
        atomic_long_set(&cli->cl_lru_busy, 0);
        atomic_long_set(&cli->cl_lru_in_list, 0);
        INIT_LIST_HEAD(&cli->cl_lru_list);
-       client_obd_list_lock_init(&cli->cl_lru_list_lock);
+       spin_lock_init(&cli->cl_lru_list_lock);
        atomic_long_set(&cli->cl_unstable_count, 0);
 
        init_waitqueue_head(&cli->cl_destroy_waitq);
@@ -508,7 +508,7 @@ int client_connect_import(const struct lu_env *env,
 
         *exp = NULL;
        down_write(&cli->cl_sem);
-        if (cli->cl_conn_count > 0 )
+       if (cli->cl_conn_count > 0)
                 GOTO(out_sem, rc = -EALREADY);
 
         rc = class_connect(&conn, obd, cluuid);
@@ -580,17 +580,17 @@ int client_disconnect_export(struct obd_export *exp)
         imp = cli->cl_import;
 
        down_write(&cli->cl_sem);
-        CDEBUG(D_INFO, "disconnect %s - %d\n", obd->obd_name,
-               cli->cl_conn_count);
+       CDEBUG(D_INFO, "disconnect %s - %zu\n", obd->obd_name,
+               cli->cl_conn_count);
 
-        if (!cli->cl_conn_count) {
+       if (cli->cl_conn_count == 0) {
                 CERROR("disconnecting disconnected device (%s)\n",
                        obd->obd_name);
                 GOTO(out_disconnect, rc = -EINVAL);
         }
 
         cli->cl_conn_count--;
-        if (cli->cl_conn_count)
+       if (cli->cl_conn_count != 0)
                 GOTO(out_disconnect, rc = 0);
 
        /* Mark import deactivated now, so we don't try to reconnect if any
@@ -900,6 +900,34 @@ int target_handle_connect(struct ptlrpc_request *req)
                mds_conn = (data->ocd_connect_flags & OBD_CONNECT_MDS) != 0;
                lw_client = (data->ocd_connect_flags &
                             OBD_CONNECT_LIGHTWEIGHT) != 0;
+
+               /* OBD_CONNECT_MNE_SWAB is defined as OBD_CONNECT_MDS_MDS
+                * for Imperative Recovery connection from MGC to MGS. */
+               if (!lw_client &&
+                   (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
+                   !(data->ocd_connect_flags & OBD_CONNECT_IMP_RECOV) &&
+                   (data->ocd_connect_flags & OBD_CONNECT_VERSION)) {
+                       __u32 major = OBD_OCD_VERSION_MAJOR(data->ocd_version);
+                       __u32 minor = OBD_OCD_VERSION_MINOR(data->ocd_version);
+                       __u32 patch = OBD_OCD_VERSION_PATCH(data->ocd_version);
+
+                       /* We do not support the MDT-MDT interoperations with
+                        * different version MDT because of protocol changes. */
+                       if (unlikely(major != LUSTRE_MAJOR ||
+                                    minor != LUSTRE_MINOR ||
+                                    abs(patch - LUSTRE_PATCH) > 3)) {
+                               LCONSOLE_WARN("%s (%u.%u.%u.%u) refused the "
+                                       "connection from different version MDT "
+                                       "(%d.%d.%d.%d) %s %s\n",
+                                       target->obd_name, LUSTRE_MAJOR,
+                                       LUSTRE_MINOR, LUSTRE_PATCH, LUSTRE_FIX,
+                                       major, minor, patch,
+                                       OBD_OCD_VERSION_FIX(data->ocd_version),
+                                       libcfs_nid2str(req->rq_peer.nid), str);
+
+                               GOTO(out, rc = -EPROTO);
+                       }
+               }
        }
 
         /* lctl gets a backstage, all-access pass. */
@@ -1474,7 +1502,7 @@ void target_cleanup_recovery(struct obd_device *obd)
        spin_unlock(&obd->obd_recovery_task_lock);
 
        list_for_each_entry_safe(req, n, &clean_list, rq_list) {
-               LASSERT(req->rq_reply_state == 0);
+               LASSERT(req->rq_reply_state == NULL);
                target_exp_dequeue_req_replay(req);
                target_request_copy_put(req);
        }
@@ -1485,7 +1513,7 @@ void target_cleanup_recovery(struct obd_device *obd)
        spin_unlock(&obd->obd_recovery_task_lock);
 
        list_for_each_entry_safe(req, n, &clean_list, rq_list) {
-                LASSERT(req->rq_reply_state == 0);
+               LASSERT(req->rq_reply_state == NULL);
                 target_request_copy_put(req);
         }
 
@@ -1768,7 +1796,11 @@ repeat:
                obd->obd_abort_recovery = 1;
        }
 
-       wait_event(obd->obd_next_transno_waitq, check_routine(obd));
+       while (wait_event_timeout(obd->obd_next_transno_waitq,
+                                 check_routine(obd),
+                                 msecs_to_jiffies(60 * MSEC_PER_SEC)) == 0)
+               /* wait indefinitely for event, but don't trigger watchdog */;
+
        if (obd->obd_abort_recovery) {
                CWARN("recovery is aborted, evict exports in recovery\n");
                /** evict exports which didn't finish recovery yet */
@@ -2378,7 +2410,8 @@ int target_pack_pool_reply(struct ptlrpc_request *req)
 }
 EXPORT_SYMBOL(target_pack_pool_reply);
 
-int target_send_reply_msg(struct ptlrpc_request *req, int rc, int fail_id)
+static int target_send_reply_msg(struct ptlrpc_request *req,
+                                int rc, int fail_id)
 {
         if (OBD_FAIL_CHECK_ORSET(fail_id & ~OBD_FAIL_ONCE, OBD_FAIL_ONCE)) {
                 DEBUG_REQ(D_ERROR, req, "dropping reply");