Whamcloud - gitweb
LU-6086 obdclass: check peer's version for MDT-MDT connection
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
index 1c370e6..5e1f170 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 
 #define DEBUG_SUBSYSTEM S_LDLM
 
-#ifdef __KERNEL__
-# include <libcfs/libcfs.h>
-#else
-# include <liblustre.h>
-#endif
+#include <libcfs/libcfs.h>
 #include <obd.h>
 #include <obd_class.h>
 #include <lustre_dlm.h>
@@ -357,7 +353,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
        INIT_LIST_HEAD(&cli->cl_loi_hp_ready_list);
        INIT_LIST_HEAD(&cli->cl_loi_write_list);
        INIT_LIST_HEAD(&cli->cl_loi_read_list);
-       client_obd_list_lock_init(&cli->cl_loi_list_lock);
+       spin_lock_init(&cli->cl_loi_list_lock);
        atomic_set(&cli->cl_pending_w_pages, 0);
        atomic_set(&cli->cl_pending_r_pages, 0);
        cli->cl_r_in_flight = 0;
@@ -373,11 +369,11 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
        /* lru for osc. */
        INIT_LIST_HEAD(&cli->cl_lru_osc);
        atomic_set(&cli->cl_lru_shrinkers, 0);
-       atomic_set(&cli->cl_lru_busy, 0);
-       atomic_set(&cli->cl_lru_in_list, 0);
+       atomic_long_set(&cli->cl_lru_busy, 0);
+       atomic_long_set(&cli->cl_lru_in_list, 0);
        INIT_LIST_HEAD(&cli->cl_lru_list);
-       client_obd_list_lock_init(&cli->cl_lru_list_lock);
-       atomic_set(&cli->cl_unstable_count, 0);
+       spin_lock_init(&cli->cl_lru_list_lock);
+       atomic_long_set(&cli->cl_unstable_count, 0);
 
        init_waitqueue_head(&cli->cl_destroy_waitq);
        atomic_set(&cli->cl_destroy_in_flight, 0);
@@ -512,7 +508,7 @@ int client_connect_import(const struct lu_env *env,
 
         *exp = NULL;
        down_write(&cli->cl_sem);
-        if (cli->cl_conn_count > 0 )
+       if (cli->cl_conn_count > 0)
                 GOTO(out_sem, rc = -EALREADY);
 
         rc = class_connect(&conn, obd, cluuid);
@@ -584,17 +580,17 @@ int client_disconnect_export(struct obd_export *exp)
         imp = cli->cl_import;
 
        down_write(&cli->cl_sem);
-        CDEBUG(D_INFO, "disconnect %s - %d\n", obd->obd_name,
-               cli->cl_conn_count);
+       CDEBUG(D_INFO, "disconnect %s - %zu\n", obd->obd_name,
+               cli->cl_conn_count);
 
-        if (!cli->cl_conn_count) {
+       if (cli->cl_conn_count == 0) {
                 CERROR("disconnecting disconnected device (%s)\n",
                        obd->obd_name);
                 GOTO(out_disconnect, rc = -EINVAL);
         }
 
         cli->cl_conn_count--;
-        if (cli->cl_conn_count)
+       if (cli->cl_conn_count != 0)
                 GOTO(out_disconnect, rc = 0);
 
        /* Mark import deactivated now, so we don't try to reconnect if any
@@ -758,17 +754,9 @@ void target_client_add_cb(struct obd_device *obd, __u64 transno, void *cb_data,
 }
 EXPORT_SYMBOL(target_client_add_cb);
 
-#ifdef __KERNEL__
 static void
 check_and_start_recovery_timer(struct obd_device *obd,
                                struct ptlrpc_request *req, int new_client);
-#else
-static inline void
-check_and_start_recovery_timer(struct obd_device *obd,
-                               struct ptlrpc_request *req, int new_client)
-{
-}
-#endif
 
 int target_handle_connect(struct ptlrpc_request *req)
 {
@@ -912,6 +900,34 @@ int target_handle_connect(struct ptlrpc_request *req)
                mds_conn = (data->ocd_connect_flags & OBD_CONNECT_MDS) != 0;
                lw_client = (data->ocd_connect_flags &
                             OBD_CONNECT_LIGHTWEIGHT) != 0;
+
+               /* OBD_CONNECT_MNE_SWAB is defined as OBD_CONNECT_MDS_MDS
+                * for Imperative Recovery connection from MGC to MGS. */
+               if (!lw_client &&
+                   (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
+                   !(data->ocd_connect_flags & OBD_CONNECT_IMP_RECOV) &&
+                   (data->ocd_connect_flags & OBD_CONNECT_VERSION)) {
+                       __u32 major = OBD_OCD_VERSION_MAJOR(data->ocd_version);
+                       __u32 minor = OBD_OCD_VERSION_MINOR(data->ocd_version);
+                       __u32 patch = OBD_OCD_VERSION_PATCH(data->ocd_version);
+
+                       /* We do not support the MDT-MDT interoperations with
+                        * different version MDT because of protocol changes. */
+                       if (unlikely(major != LUSTRE_MAJOR ||
+                                    minor != LUSTRE_MINOR ||
+                                    abs(patch - LUSTRE_PATCH) > 3)) {
+                               LCONSOLE_WARN("%s (%u.%u.%u.%u) refused the "
+                                       "connection from different version MDT "
+                                       "(%d.%d.%d.%d) %s %s\n",
+                                       target->obd_name, LUSTRE_MAJOR,
+                                       LUSTRE_MINOR, LUSTRE_PATCH, LUSTRE_FIX,
+                                       major, minor, patch,
+                                       OBD_OCD_VERSION_FIX(data->ocd_version),
+                                       libcfs_nid2str(req->rq_peer.nid), str);
+
+                               GOTO(out, rc = -EPROTO);
+                       }
+               }
        }
 
         /* lctl gets a backstage, all-access pass. */
@@ -1069,25 +1085,11 @@ dont_check_exports:
                 rc = obd_reconnect(req->rq_svc_thread->t_env,
                                    export, target, &cluuid, data, client_nid);
         }
-        if (rc)
-                GOTO(out, rc);
-
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 6, 52, 0)
-       /* 2.2.0 clients always swab nidtbl entries due to a bug, so server
-        * will do the swabbing for if the client is using the same endianness.
-        *
-        * This fixup is version-limited, because we don't want to carry the
-        * OBD_CONNECT_MNE_SWAB flag around forever, just so long as we need
-        * interop with unpatched 2.2 clients.  For newer clients, servers
-        * will never do MNE swabbing, let the client handle that.  LU-1644 */
-       export->exp_need_mne_swab = !ptlrpc_req_need_swab(req) &&
-                       !(data->ocd_connect_flags & OBD_CONNECT_MNE_SWAB);
-#else
-#warning "LU-1644: Remove old OBD_CONNECT_MNE_SWAB fixup and exp_need_mne_swab"
-#endif
+       if (rc)
+               GOTO(out, rc);
 
-        LASSERT(target->u.obt.obt_magic == OBT_MAGIC);
-        data->ocd_instance = target->u.obt.obt_instance;
+       LASSERT(target->u.obt.obt_magic == OBT_MAGIC);
+       data->ocd_instance = target->u.obt.obt_instance;
 
         /* Return only the parts of obd_connect_data that we understand, so the
          * client knows that we don't understand the rest. */
@@ -1382,7 +1384,6 @@ static void target_exp_dequeue_req_replay(struct ptlrpc_request *req)
        spin_unlock(&req->rq_export->exp_lock);
 }
 
-#ifdef __KERNEL__
 static void target_finish_recovery(struct obd_device *obd)
 {
         ENTRY;
@@ -1501,7 +1502,7 @@ void target_cleanup_recovery(struct obd_device *obd)
        spin_unlock(&obd->obd_recovery_task_lock);
 
        list_for_each_entry_safe(req, n, &clean_list, rq_list) {
-               LASSERT(req->rq_reply_state == 0);
+               LASSERT(req->rq_reply_state == NULL);
                target_exp_dequeue_req_replay(req);
                target_request_copy_put(req);
        }
@@ -1512,7 +1513,7 @@ void target_cleanup_recovery(struct obd_device *obd)
        spin_unlock(&obd->obd_recovery_task_lock);
 
        list_for_each_entry_safe(req, n, &clean_list, rq_list) {
-                LASSERT(req->rq_reply_state == 0);
+               LASSERT(req->rq_reply_state == NULL);
                 target_request_copy_put(req);
         }
 
@@ -1590,22 +1591,20 @@ static void extend_recovery_timer(struct obd_device *obd, int drt, bool extend)
                 to += drt - left;
         } else if (!extend && (drt > to)) {
                 to = drt;
-                /* reduce drt by already passed time */
-                drt -= obd->obd_recovery_timeout - left;
         }
 
         if (to > obd->obd_recovery_time_hard)
                 to = obd->obd_recovery_time_hard;
-       if (obd->obd_recovery_timeout < to ||
-           obd->obd_recovery_timeout == obd->obd_recovery_time_hard) {
+       if (obd->obd_recovery_timeout < to) {
                 obd->obd_recovery_timeout = to;
-                cfs_timer_arm(&obd->obd_recovery_timer,
-                              cfs_time_shift(drt));
+               end = obd->obd_recovery_start + to;
+               cfs_timer_arm(&obd->obd_recovery_timer,
+                               cfs_time_shift(end - now));
         }
        spin_unlock(&obd->obd_dev_lock);
 
        CDEBUG(D_HA, "%s: recovery timer will expire in %u seconds\n",
-              obd->obd_name, (unsigned)drt);
+               obd->obd_name, (unsigned)cfs_time_sub(end, now));
 }
 
 /* Reset the timer with each new client connection */
@@ -1791,7 +1790,17 @@ static int target_recovery_overseer(struct obd_device *obd,
                                    int (*health_check)(struct obd_export *))
 {
 repeat:
-       wait_event(obd->obd_next_transno_waitq, check_routine(obd));
+       if ((obd->obd_recovery_start != 0) && (cfs_time_current_sec() >=
+             (obd->obd_recovery_start + obd->obd_recovery_time_hard))) {
+               CWARN("recovery is aborted by hard timeout\n");
+               obd->obd_abort_recovery = 1;
+       }
+
+       while (wait_event_timeout(obd->obd_next_transno_waitq,
+                                 check_routine(obd),
+                                 msecs_to_jiffies(60 * MSEC_PER_SEC)) == 0)
+               /* wait indefinitely for event, but don't trigger watchdog */;
+
        if (obd->obd_abort_recovery) {
                CWARN("recovery is aborted, evict exports in recovery\n");
                /** evict exports which didn't finish recovery yet */
@@ -1821,17 +1830,24 @@ repeat:
 
 static struct ptlrpc_request *target_next_replay_req(struct obd_device *obd)
 {
-        struct ptlrpc_request *req = NULL;
-        ENTRY;
+       struct ptlrpc_request *req = NULL;
+       ENTRY;
 
-        CDEBUG(D_HA, "Waiting for transno "LPD64"\n",
-               obd->obd_next_recovery_transno);
+       CDEBUG(D_HA, "Waiting for transno "LPD64"\n",
+               obd->obd_next_recovery_transno);
 
-        if (target_recovery_overseer(obd, check_for_next_transno,
-                                     exp_req_replay_healthy)) {
-                abort_req_replay_queue(obd);
-                abort_lock_replay_queue(obd);
-        }
+       CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLAY_DELAY2, cfs_fail_val);
+       /** It is needed to extend recovery window above recovery_time_soft.
+        *  Extending is possible only in the end of recovery window
+        *  (see more details in handle_recovery_req).
+        */
+       CFS_FAIL_TIMEOUT_MS(OBD_FAIL_TGT_REPLAY_DELAY, 300);
+
+       if (target_recovery_overseer(obd, check_for_next_transno,
+                                    exp_req_replay_healthy)) {
+               abort_req_replay_queue(obd);
+               abort_lock_replay_queue(obd);
+       }
 
        spin_lock(&obd->obd_recovery_task_lock);
        if (!list_empty(&obd->obd_req_replay_queue)) {
@@ -2044,6 +2060,7 @@ static int target_recovery_thread(void *arg)
          * The third stage: reply on final pings, at this moment all clients
          * must have request in final queue
          */
+       CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLAY_RECONNECT, cfs_fail_val);
         CDEBUG(D_INFO, "3: final stage - process recovery completion pings\n");
         /** Update server last boot epoch */
         tgt_boot_epoch_update(lut);
@@ -2066,9 +2083,9 @@ static int target_recovery_thread(void *arg)
                 * export is being evicted */
                ptlrpc_update_export_timer(req->rq_export, 0);
                target_request_copy_put(req);
-        }
+       }
 
-       delta = (jiffies - delta) / HZ;
+       delta = jiffies_to_msecs(jiffies - delta) / MSEC_PER_SEC;
        CDEBUG(D_INFO,"4: recovery completed in %lus - %d/%d reqs/locks\n",
              delta, obd->obd_replayed_requests, obd->obd_replayed_locks);
        if (delta > OBD_RECOVERY_TIME_SOFT) {
@@ -2171,7 +2188,6 @@ void target_recovery_init(struct lu_target *lut, svc_handler_t handler)
 }
 EXPORT_SYMBOL(target_recovery_init);
 
-#endif /* __KERNEL__ */
 
 static int target_process_req_flags(struct obd_device *obd,
                                     struct ptlrpc_request *req)
@@ -2394,7 +2410,8 @@ int target_pack_pool_reply(struct ptlrpc_request *req)
 }
 EXPORT_SYMBOL(target_pack_pool_reply);
 
-int target_send_reply_msg(struct ptlrpc_request *req, int rc, int fail_id)
+static int target_send_reply_msg(struct ptlrpc_request *req,
+                                int rc, int fail_id)
 {
         if (OBD_FAIL_CHECK_ORSET(fail_id & ~OBD_FAIL_ONCE, OBD_FAIL_ONCE)) {
                 DEBUG_REQ(D_ERROR, req, "dropping reply");
@@ -2526,6 +2543,7 @@ int ldlm_error2errno(ldlm_error_t error)
 
         switch (error) {
         case ELDLM_OK:
+       case ELDLM_LOCK_MATCHED:
                 result = 0;
                 break;
         case ELDLM_LOCK_CHANGED:
@@ -2631,6 +2649,7 @@ int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
 {
        struct ptlrpc_request   *req = desc->bd_req;
        time_t                   start = cfs_time_current_sec();
+       time_t                   deadline;
        int                      rc = 0;
 
        ENTRY;
@@ -2640,7 +2659,7 @@ int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
                *lwi = LWI_INTR(NULL, NULL);
                rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
                                  !atomic_read(&exp->exp_obd->
-                                                  obd_evict_inprogress),
+                                                  obd_evict_inprogress),
                                  lwi);
        }
 
@@ -2666,10 +2685,16 @@ int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
                RETURN(0);
        }
 
+       /* limit actual bulk transfer to bulk_timeout seconds */
+       deadline = start + bulk_timeout;
+       if (deadline > req->rq_deadline)
+               deadline = req->rq_deadline;
+
        do {
-               long timeoutl = req->rq_deadline - cfs_time_current_sec();
+               long timeoutl = deadline - cfs_time_current_sec();
                cfs_duration_t timeout = timeoutl <= 0 ?
                                         CFS_TICK : cfs_time_seconds(timeoutl);
+               time_t  rq_deadline;
 
                *lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1),
                                            target_bulk_timeout, desc);
@@ -2680,14 +2705,18 @@ int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
                                  lustre_msg_get_conn_cnt(req->rq_reqmsg),
                                  lwi);
                LASSERT(rc == 0 || rc == -ETIMEDOUT);
-               /* Wait again if we changed deadline. */
+               /* Wait again if we changed rq_deadline. */
+               rq_deadline = ACCESS_ONCE(req->rq_deadline);
+               deadline = start + bulk_timeout;
+               if (deadline > rq_deadline)
+                       deadline = rq_deadline;
        } while ((rc == -ETIMEDOUT) &&
-                (req->rq_deadline > cfs_time_current_sec()));
+                (deadline > cfs_time_current_sec()));
 
        if (rc == -ETIMEDOUT) {
                DEBUG_REQ(D_ERROR, req, "timeout on bulk %s after %ld%+lds",
-                         bulk2type(desc), req->rq_deadline - start,
-                         cfs_time_current_sec() - req->rq_deadline);
+                         bulk2type(desc), deadline - start,
+                         cfs_time_current_sec() - deadline);
                ptlrpc_abort_bulk(desc);
        } else if (exp->exp_failed) {
                DEBUG_REQ(D_ERROR, req, "Eviction on bulk %s",