Whamcloud - gitweb
LU-13456 ldlm: fix reprocessing of locks with more bits
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 9b16cb6..03ca051 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/ldlm/ldlm_lockd.c
  *
@@ -255,11 +254,12 @@ static int expired_lock_main(void *arg)
 
                                LDLM_ERROR(lock,
                                           "lock callback timer expired after %llds: evicting client at %s ",
-                                          ktime_get_real_seconds() -
+                                          ktime_get_seconds() -
                                           lock->l_blast_sent,
                                           obd_export_nid2str(export));
                                ldlm_lock_to_ns(lock)->ns_timeouts++;
-                               do_dump++;
+                               if (do_dump_on_eviction(export->exp_obd))
+                                       do_dump++;
                                class_fail_export(export);
                        }
                        class_export_lock_put(export, lock);
@@ -273,7 +273,7 @@ static int expired_lock_main(void *arg)
                }
                spin_unlock_bh(&waiting_locks_spinlock);
 
-               if (do_dump && obd_dump_on_eviction) {
+               if (do_dump) {
                        CERROR("dump the log upon eviction\n");
                        libcfs_debug_dumplog();
                }
@@ -377,10 +377,10 @@ static void waiting_locks_callback(TIMER_DATA_TYPE unused)
 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t delay)
 {
        unsigned long timeout_jiffies = jiffies;
-       time64_t now = ktime_get_seconds();
        time64_t deadline;
        timeout_t timeout;
 
+       lock->l_blast_sent = ktime_get_seconds();
        if (!list_empty(&lock->l_pending_chain))
                return 0;
 
@@ -388,11 +388,12 @@ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t delay)
            OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
                delay = 1;
 
-       deadline = now + delay;
+       deadline = lock->l_blast_sent + delay;
        if (likely(deadline > lock->l_callback_timestamp))
                lock->l_callback_timestamp = deadline;
 
-       timeout = clamp_t(timeout_t, lock->l_callback_timestamp - now,
+       timeout = clamp_t(timeout_t,
+                         lock->l_callback_timestamp - lock->l_blast_sent,
                          0, delay);
        timeout_jiffies += cfs_time_seconds(timeout);
 
@@ -468,7 +469,6 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
        }
 
        ldlm_set_waited(lock);
-       lock->l_blast_sent = ktime_get_real_seconds();
        ret = __ldlm_add_waiting_lock(lock, timeout);
        if (ret) {
                /*
@@ -596,7 +596,7 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
        __ldlm_add_waiting_lock(lock, timeout);
        spin_unlock_bh(&waiting_locks_spinlock);
 
-       LDLM_DEBUG(lock, "refreshed");
+       LDLM_DEBUG(lock, "refreshed to %ds", timeout);
        return 1;
 }
 EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
@@ -944,6 +944,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
        body->lock_handle[0] = lock->l_remote_handle;
+       body->lock_handle[1].cookie = lock->l_handle.h_cookie;
        body->lock_desc = *desc;
        body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & LDLM_FL_AST_MASK);
 
@@ -1039,6 +1040,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
 
        body->lock_handle[0] = lock->l_remote_handle;
+       body->lock_handle[1].cookie = lock->l_handle.h_cookie;
        body->lock_flags = ldlm_flags_to_wire(flags);
        ldlm_lock2desc(lock, &body->lock_desc);
        if (lvb_len > 0) {
@@ -1198,7 +1200,7 @@ int ldlm_glimpse_locks(struct ldlm_resource *res,
        rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
                               LDLM_WORK_GL_AST);
        if (rc == -ERESTART)
-               ldlm_reprocess_all(res, NULL);
+               ldlm_reprocess_all(res, 0);
 
        RETURN(rc);
 }
@@ -1359,11 +1361,14 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                                     dlm_req->lock_desc.l_resource.lr_type,
                                     &dlm_req->lock_desc.l_policy_data,
                                     &lock->l_policy_data);
-       if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
+       if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
                lock->l_req_extent = lock->l_policy_data.l_extent;
-       else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS)
+       } else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) {
                lock->l_policy_data.l_inodebits.try_bits =
                        dlm_req->lock_desc.l_policy_data.l_inodebits.try_bits;
+               lock->l_policy_data.l_inodebits.li_gid =
+                       dlm_req->lock_desc.l_policy_data.l_inodebits.li_gid;
+       }
 
 existing_lock:
        cookie = req;
@@ -1462,9 +1467,9 @@ existing_lock:
 out:
        req->rq_status = rc ?: err; /* return either error - b=11190 */
        if (!req->rq_packed_final) {
-               err = lustre_pack_reply(req, 1, NULL, NULL);
+               int rc1 = lustre_pack_reply(req, 1, NULL, NULL);
                if (rc == 0)
-                       rc = err;
+                       rc = rc1;
        }
 
        /*
@@ -1542,13 +1547,14 @@ retry:
                                ldlm_resource_unlink_lock(lock);
                                ldlm_lock_destroy_nolock(lock);
                                unlock_res_and_lock(lock);
-
                        }
+                       ldlm_reprocess_all(lock->l_resource, lock->l_policy_data.l_inodebits.bits);
                }
 
                if (!err && !ldlm_is_cbpending(lock) &&
                    dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
-                       ldlm_reprocess_all(lock->l_resource, lock);
+                       ldlm_reprocess_all(lock->l_resource,
+                                          lock->l_policy_data.l_inodebits.bits);
 
                LDLM_LOCK_RELEASE(lock);
        }
@@ -1654,7 +1660,10 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
                ldlm_clear_blocking_data(lock);
                unlock_res_and_lock(lock);
 
-               ldlm_reprocess_all(lock->l_resource, NULL);
+               /* All old bits should be reprocessed to send new BL AST if
+                * it wasn't sent earlier due to LDLM_FL_AST_SENT bit set.
+                * */
+               ldlm_reprocess_all(lock->l_resource, bits);
        }
 
        dlm_rep->lock_handle = lock->l_remote_handle;
@@ -1728,7 +1737,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                 */
                if (res != pres) {
                        if (pres != NULL) {
-                               ldlm_reprocess_all(pres, NULL);
+                               ldlm_reprocess_all(pres, 0);
                                LDLM_RESOURCE_DELREF(pres);
                                ldlm_resource_putref(pres);
                        }
@@ -1747,8 +1756,8 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                    lock->l_blast_sent != 0) {
                        timeout_t delay = 0;
 
-                       if (ktime_get_real_seconds() > lock->l_blast_sent)
-                               delay = ktime_get_real_seconds() -
+                       if (ktime_get_seconds() > lock->l_blast_sent)
+                               delay = ktime_get_seconds() -
                                        lock->l_blast_sent;
                        LDLM_DEBUG(lock,
                                   "server cancels blocked lock after %ds",
@@ -1759,7 +1768,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                LDLM_LOCK_PUT(lock);
        }
        if (pres != NULL) {
-               ldlm_reprocess_all(pres, NULL);
+               ldlm_reprocess_all(pres, 0);
                LDLM_RESOURCE_DELREF(pres);
                ldlm_resource_putref(pres);
        }
@@ -1927,7 +1936,7 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
                ldlm_callback_reply(req, 0);
 
                while (to > 0) {
-                       schedule_timeout_interruptible(to);
+                       to = schedule_timeout_interruptible(to);
                        if (ldlm_is_granted(lock) ||
                            ldlm_is_destroyed(lock))
                                break;
@@ -2214,6 +2223,11 @@ int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
        return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
 }
 
+int ldlm_bl_to_thread_ns(struct ldlm_namespace *ns)
+{
+       return ldlm_bl_to_thread(ns, NULL, NULL, NULL, 0, LCF_ASYNC);
+}
+
 int ldlm_bl_thread_wakeup(void)
 {
        wake_up(&ldlm_state->ldlm_bl_pool->blp_waitq);
@@ -2408,6 +2422,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                ldlm_lock_remove_from_lru(lock);
                ldlm_set_bl_ast(lock);
        }
+       if (lock->l_remote_handle.cookie == 0)
+               lock->l_remote_handle = dlm_req->lock_handle[1];
        unlock_res_and_lock(lock);
 
        /*
@@ -2681,15 +2697,18 @@ static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
 
 void ldlm_revoke_export_locks(struct obd_export *exp)
 {
+       int rc;
        LIST_HEAD(rpc_list);
-
        ENTRY;
 
        cfs_hash_for_each_nolock(exp->exp_lock_hash,
                                 ldlm_revoke_lock_cb, &rpc_list, 0);
-       ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
+       rc = ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
                          LDLM_WORK_REVOKE_AST);
 
+       if (rc == -ERESTART)
+               ldlm_reprocess_recovery_done(exp->exp_obd->obd_namespace);
+
        EXIT;
 }
 EXPORT_SYMBOL(ldlm_revoke_export_locks);
@@ -2832,10 +2851,17 @@ static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
                                                   LCF_BL_AST);
                ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
                                     blwi->blwi_flags);
-       } else {
+       } else if (blwi->blwi_lock) {
                ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
                                        blwi->blwi_lock);
+       } else {
+               ldlm_pool_recalc(&blwi->blwi_ns->ns_pool, true);
+               spin_lock(&blwi->blwi_ns->ns_lock);
+               blwi->blwi_ns->ns_rpc_recalc = 0;
+               spin_unlock(&blwi->blwi_ns->ns_lock);
+               ldlm_namespace_put(blwi->blwi_ns);
        }
+
        if (blwi->blwi_mem_pressure)
                memalloc_noreclaim_restore(mpflags);
 
@@ -3444,6 +3470,7 @@ void ldlm_exit(void)
 {
        if (ldlm_refcount)
                CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
+       synchronize_rcu();
        kmem_cache_destroy(ldlm_resource_slab);
        /*
         * ldlm_lock_put() use RCU to call ldlm_lock_free, so need call