Whamcloud - gitweb
LU-3285 merge: 'dom' branch merging
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 383b391..b96396a 100644 (file)
@@ -64,18 +64,16 @@ struct kset *ldlm_svc_kset;
 
 static struct ldlm_state *ldlm_state;
 
-static inline cfs_time_t round_timeout(cfs_time_t timeout)
-{
-        return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
-}
-
-/* timeout for initial callback (AST) reply (bz10399) */
-static inline unsigned int ldlm_get_rq_timeout(void)
+/* timeout for initial callback (AST) reply (bz10399)
+ * Due to having to send a 32 bit time value over the
+ * wire return it as time_t instead of time64_t
+ */
+static inline time_t ldlm_get_rq_timeout(void)
 {
-        /* Non-AT value */
-        unsigned int timeout = min(ldlm_timeout, obd_timeout / 3);
+       /* Non-AT value */
+       time_t timeout = min(ldlm_timeout, obd_timeout / 3);
 
-        return timeout < 1 ? 1 : timeout;
+       return timeout < 1 ? 1 : timeout;
 }
 
 struct ldlm_bl_pool {
@@ -259,7 +257,7 @@ static int expired_lock_main(void *arg)
 }
 
 static int ldlm_add_waiting_lock(struct ldlm_lock *lock);
-static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds);
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds);
 
 /**
  * Check if there is a request in the export request list
@@ -296,11 +294,10 @@ static void waiting_locks_callback(unsigned long unused)
        spin_lock_bh(&waiting_locks_spinlock);
        while (!list_empty(&waiting_locks_list)) {
                lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
-                                      l_pending_chain);
-                if (cfs_time_after(lock->l_callback_timeout,
-                                   cfs_time_current()) ||
-                    (lock->l_req_mode == LCK_GROUP))
-                        break;
+                                 l_pending_chain);
+               if (lock->l_callback_timeout > ktime_get_seconds() ||
+                   lock->l_req_mode == LCK_GROUP)
+                       break;
 
                 /* Check if we need to prolong timeout */
                 if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
@@ -348,17 +345,18 @@ static void waiting_locks_callback(unsigned long unused)
                wake_up(&expired_lock_wait_queue);
        }
 
-        /*
-         * Make sure the timer will fire again if we have any locks
-         * left.
-         */
+       /*
+        * Make sure the timer will fire again if we have any locks
+        * left.
+        */
        if (!list_empty(&waiting_locks_list)) {
-                cfs_time_t timeout_rounded;
+               unsigned long timeout_jiffies;
+
                lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
-                                      l_pending_chain);
-                timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout);
-               mod_timer(&waiting_locks_timer, timeout_rounded);
-        }
+                                 l_pending_chain);
+               timeout_jiffies = cfs_time_seconds(lock->l_callback_timeout);
+               mod_timer(&waiting_locks_timer, timeout_jiffies);
+       }
        spin_unlock_bh(&waiting_locks_spinlock);
 }
 
@@ -374,10 +372,10 @@ static void waiting_locks_callback(unsigned long unused)
  *
  * Called with the namespace lock held.
  */
-static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds)
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds)
 {
-        cfs_time_t timeout;
-        cfs_time_t timeout_rounded;
+       unsigned long timeout_jiffies;
+       time64_t timeout;
 
        if (!list_empty(&lock->l_pending_chain))
                 return 0;
@@ -386,21 +384,22 @@ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds)
             OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
                 seconds = 1;
 
-        timeout = cfs_time_shift(seconds);
-        if (likely(cfs_time_after(timeout, lock->l_callback_timeout)))
+       timeout = ktime_get_seconds() + seconds;
+       if (likely(timeout > lock->l_callback_timeout))
                 lock->l_callback_timeout = timeout;
 
-        timeout_rounded = round_timeout(lock->l_callback_timeout);
+       timeout_jiffies = cfs_time_seconds(lock->l_callback_timeout);
 
-       if (cfs_time_before(timeout_rounded, waiting_locks_timer.expires) ||
-           !timer_pending(&waiting_locks_timer)) {
-               mod_timer(&waiting_locks_timer, timeout_rounded);
-        }
-        /* if the new lock has a shorter timeout than something earlier on
-           the list, we'll wait the longer amount of time; no big deal. */
-        /* FIFO */
+       if (time_before(timeout_jiffies, waiting_locks_timer.expires) ||
+           !timer_pending(&waiting_locks_timer))
+               mod_timer(&waiting_locks_timer, timeout_jiffies);
+
+       /* if the new lock has a shorter timeout than something earlier on
+        * the list, we'll wait the longer amount of time; no big deal.
+        */
+       /* FIFO */
        list_add_tail(&lock->l_pending_chain, &waiting_locks_list);
-        return 1;
+       return 1;
 }
 
 static void ldlm_add_blocked_lock(struct ldlm_lock *lock)
@@ -427,8 +426,8 @@ static void ldlm_add_blocked_lock(struct ldlm_lock *lock)
 
 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
 {
+       time64_t timeout = ldlm_bl_timeout(lock);
        int ret;
-       int timeout = ldlm_bl_timeout(lock);
 
        /* NB: must be called with hold of lock_res_and_lock() */
        LASSERT(ldlm_is_res_locked(lock));
@@ -447,12 +446,12 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
        }
 
        if (ldlm_is_destroyed(lock)) {
-               static cfs_time_t next;
+               static time64_t next;
 
                spin_unlock_bh(&waiting_locks_spinlock);
                LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
-               if (cfs_time_after(cfs_time_current(), next)) {
-                       next = cfs_time_shift(14400);
+               if (ktime_get_seconds() > next) {
+                       next = ktime_get_seconds() + 14400;
                        libcfs_debug_dumpstack(NULL);
                }
                return 0;
@@ -471,7 +470,7 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
        if (ret)
                ldlm_add_blocked_lock(lock);
 
-       LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
+       LDLM_DEBUG(lock, "%sadding to wait list(timeout: %lld, AT: %s)",
                   ret == 0 ? "not re-" : "", timeout,
                   AT_OFF ? "off" : "on");
        return ret;
@@ -501,10 +500,11 @@ static int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
                        del_timer(&waiting_locks_timer);
                 } else {
                         struct ldlm_lock *next;
+
                        next = list_entry(list_next, struct ldlm_lock,
-                                              l_pending_chain);
+                                         l_pending_chain);
                        mod_timer(&waiting_locks_timer,
-                                 round_timeout(next->l_callback_timeout));
+                                 cfs_time_seconds(next->l_callback_timeout));
                 }
         }
        list_del_init(&lock->l_pending_chain);
@@ -547,7 +547,7 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
  *
  * Called with namespace lock held.
  */
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
 {
        if (lock->l_export == NULL) {
                /* We don't have a "waiting locks list" on clients. */
@@ -587,7 +587,7 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
         RETURN(0);
 }
 
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
 {
         RETURN(0);
 }
@@ -605,9 +605,9 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
  *
  * \retval            timeout in seconds to wait for the client reply
  */
-unsigned int ldlm_bl_timeout(struct ldlm_lock *lock)
+time64_t ldlm_bl_timeout(struct ldlm_lock *lock)
 {
-       unsigned int timeout;
+       time64_t timeout;
 
        if (AT_OFF)
                return obd_timeout / 2;
@@ -617,7 +617,7 @@ unsigned int ldlm_bl_timeout(struct ldlm_lock *lock)
         * It would be nice to have some kind of "early reply" mechanism for
         * lock callbacks too... */
        timeout = at_get(&lock->l_export->exp_bl_lock_at);
-       return max(timeout + (timeout >> 1), ldlm_enqueue_min);
+       return max(timeout + (timeout >> 1), (time64_t)ldlm_enqueue_min);
 }
 EXPORT_SYMBOL(ldlm_bl_timeout);
 
@@ -654,14 +654,7 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock,
        struct lnet_process_id peer = req->rq_import->imp_connection->c_peer;
 
        if (!req->rq_replied || (rc && rc != -EINVAL)) {
-               if (lock->l_export && lock->l_export->exp_libclient) {
-                       LDLM_DEBUG(lock,
-                                  "%s AST (req@%p x%llu) to liblustre client (nid %s) timeout, just cancelling lock",
-                                  ast_type, req, req->rq_xid,
-                                  libcfs_nid2str(peer.nid));
-                       ldlm_lock_cancel(lock);
-                       rc = -ERESTART;
-               } else if (ldlm_is_cancel(lock)) {
+               if (ldlm_is_cancel(lock)) {
                        LDLM_DEBUG(lock,
                                   "%s AST (req@%p x%llu) timeout from nid %s, but cancel was received (AST reply lost?)",
                                   ast_type, req, req->rq_xid,
@@ -780,8 +773,8 @@ static int ldlm_cb_interpret(const struct lu_env *env,
 
 static void ldlm_update_resend(struct ptlrpc_request *req, void *data)
 {
-       struct ldlm_cb_async_args *ca   = data;
-       struct ldlm_lock          *lock = ca->ca_lock;
+       struct ldlm_cb_async_args *ca = data;
+       struct ldlm_lock *lock = ca->ca_lock;
 
        ldlm_refresh_waiting_lock(lock, ldlm_bl_timeout(lock));
 }
@@ -874,18 +867,18 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
 
         ldlm_lock_reorder_req(lock);
 
-        req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
-                                        &RQF_LDLM_BL_CALLBACK,
-                                        LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK);
-        if (req == NULL)
-                RETURN(-ENOMEM);
+       req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
+                                       &RQF_LDLM_BL_CALLBACK,
+                                       LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK);
+       if (req == NULL)
+               RETURN(-ENOMEM);
 
-        CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
-        ca = ptlrpc_req_async_args(req);
-        ca->ca_set_arg = arg;
-        ca->ca_lock = lock;
+       CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+       ca = ptlrpc_req_async_args(req);
+       ca->ca_set_arg = arg;
+       ca->ca_lock = lock;
 
-        req->rq_interpret_reply = ldlm_cb_interpret;
+       req->rq_interpret_reply = ldlm_cb_interpret;
 
        lock_res_and_lock(lock);
        if (ldlm_is_destroyed(lock)) {
@@ -992,21 +985,21 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
                lvb_len = 0;
 
        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT, lvb_len);
-        rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
+       rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
 
-        CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
-        ca = ptlrpc_req_async_args(req);
-        ca->ca_set_arg = arg;
-        ca->ca_lock = lock;
+       CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+       ca = ptlrpc_req_async_args(req);
+       ca->ca_set_arg = arg;
+       ca->ca_lock = lock;
 
-        req->rq_interpret_reply = ldlm_cb_interpret;
-        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+       req->rq_interpret_reply = ldlm_cb_interpret;
+       body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
 
-        body->lock_handle[0] = lock->l_remote_handle;
+       body->lock_handle[0] = lock->l_remote_handle;
        body->lock_flags = ldlm_flags_to_wire(flags);
         ldlm_lock2desc(lock, &body->lock_desc);
        if (lvb_len > 0) {
@@ -1120,9 +1113,9 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
                *desc = *arg->gl_desc;
        }
 
-        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
-        body->lock_handle[0] = lock->l_remote_handle;
-        ldlm_lock2desc(lock, &body->lock_desc);
+       body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+       body->lock_handle[0] = lock->l_remote_handle;
+       ldlm_lock2desc(lock, &body->lock_desc);
 
        CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
        ca = ptlrpc_req_async_args(req);
@@ -1400,34 +1393,9 @@ existing_lock:
                                 ldlm_add_waiting_lock(lock);
                 }
         }
-        /* Make sure we never ever grant usual metadata locks to liblustre
-           clients */
-        if ((dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN ||
-            dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) &&
-             req->rq_export->exp_libclient) {
-               if (unlikely(!ldlm_is_cancel_on_block(lock) ||
-                             !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){
-                        CERROR("Granting sync lock to libclient. "
-                              "req fl %d, rep fl %d, lock fl %#llx\n",
-                               dlm_req->lock_flags, dlm_rep->lock_flags,
-                               lock->l_flags);
-                        LDLM_ERROR(lock, "sync lock");
-                       if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) {
-                               struct ldlm_intent *it;
-
-                               it = req_capsule_client_get(&req->rq_pill,
-                                                           &RMF_LDLM_INTENT);
-                               if (it != NULL) {
-                                       CERROR("This is intent %s (%llu)\n",
-                                              ldlm_it2str(it->opc), it->opc);
-                               }
-                       }
-                }
-        }
+       unlock_res_and_lock(lock);
 
-        unlock_res_and_lock(lock);
-
-        EXIT;
+       EXIT;
  out:
         req->rq_status = rc ?: err; /* return either error - bug 11190 */
         if (!req->rq_packed_final) {
@@ -1775,7 +1743,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
 
        INIT_LIST_HEAD(&ast_list);
        if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
-               int to = cfs_time_seconds(1);
+               long to = cfs_time_seconds(1);
+
                while (to > 0) {
                        set_current_state(TASK_INTERRUPTIBLE);
                        schedule_timeout(to);
@@ -2946,7 +2915,7 @@ static int ldlm_setup(void)
         if (ldlm_state == NULL)
                 RETURN(-ENOMEM);
 
-       ldlm_kobj = kobject_create_and_add("ldlm", lustre_kobj);
+       ldlm_kobj = kobject_create_and_add("ldlm", &lustre_kset->kobj);
        if (!ldlm_kobj)
                GOTO(out, -ENOMEM);
 
@@ -3154,8 +3123,10 @@ static int ldlm_cleanup(void)
                kset_unregister(ldlm_ns_kset);
        if (ldlm_svc_kset)
                kset_unregister(ldlm_svc_kset);
-       if (ldlm_kobj)
+       if (ldlm_kobj) {
+               sysfs_remove_group(ldlm_kobj, &ldlm_attr_group);
                kobject_put(ldlm_kobj);
+       }
 
        ldlm_proc_cleanup();