Whamcloud - gitweb
LU-5398 ldlm: handle NULL lock in ldlm_handle_enqueue0()
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 084e3e3..f56c776 100644 (file)
@@ -110,8 +110,8 @@ struct ldlm_bl_pool {
 
        wait_queue_head_t       blp_waitq;
        struct completion       blp_comp;
-       cfs_atomic_t            blp_num_threads;
-       cfs_atomic_t            blp_busy_threads;
+       atomic_t            blp_num_threads;
+       atomic_t            blp_busy_threads;
        int                     blp_min_threads;
        int                     blp_max_threads;
 };
@@ -339,7 +339,7 @@ static void waiting_locks_callback(unsigned long unused)
                 ldlm_lock_to_ns(lock)->ns_timeouts++;
                 LDLM_ERROR(lock, "lock callback timer expired after %lds: "
                            "evicting client at %s ",
-                           cfs_time_current_sec()- lock->l_last_activity,
+                           cfs_time_current_sec() - lock->l_last_activity,
                            libcfs_nid2str(
                                    lock->l_export->exp_connection->c_peer.nid));
 
@@ -717,9 +717,9 @@ static int ldlm_cb_interpret(const struct lu_env *env,
         LDLM_LOCK_RELEASE(lock);
 
        if (rc == -ERESTART)
-               cfs_atomic_inc(&arg->restart);
+               atomic_inc(&arg->restart);
 
-        RETURN(0);
+       RETURN(0);
 }
 
 static inline int ldlm_ast_fini(struct ptlrpc_request *req,
@@ -734,7 +734,7 @@ static inline int ldlm_ast_fini(struct ptlrpc_request *req,
                rc = ptl_send_rpc(req, 1);
                ptlrpc_req_finished(req);
                if (rc == 0)
-                       cfs_atomic_inc(&arg->restart);
+                       atomic_inc(&arg->restart);
        } else {
                LDLM_LOCK_GET(lock);
                ptlrpc_set_add_req(arg->set, req);
@@ -862,6 +862,8 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         if (AT_OFF)
                 req->rq_timeout = ldlm_get_rq_timeout();
 
+       lock->l_last_activity = cfs_time_current_sec();
+
         if (lock->l_export && lock->l_export->exp_nid_stats &&
             lock->l_export->exp_nid_stats->nid_ldlm_stats)
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
@@ -898,6 +900,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
         total_enqueue_wait = cfs_time_sub(cfs_time_current_sec(),
                                           lock->l_last_activity);
 
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_OST_LDLM_REPLY_NET)) {
+               LDLM_DEBUG(lock, "dropping CP AST");
+               RETURN(0);
+       }
+
         req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
                                     &RQF_LDLM_CP_CALLBACK);
         if (req == NULL)
@@ -954,6 +961,8 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
         LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
                    total_enqueue_wait);
 
+       lock->l_last_activity = cfs_time_current_sec();
+
         /* Server-side enqueue wait time estimate, used in
             __ldlm_add_waiting_lock to set future enqueue timers */
         if (total_enqueue_wait < ldlm_get_enq_timeout(lock))
@@ -1070,6 +1079,8 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
         if (AT_OFF)
                 req->rq_timeout = ldlm_get_rq_timeout();
 
+       lock->l_last_activity = cfs_time_current_sec();
+
        req->rq_interpret_reply = ldlm_cb_interpret;
 
         if (lock->l_export && lock->l_export->exp_nid_stats &&
@@ -1226,7 +1237,8 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
         }
 #endif
 
-        if (unlikely(flags & LDLM_FL_REPLAY)) {
+       if (unlikely((flags & LDLM_FL_REPLAY) ||
+                    (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) {
                 /* Find an existing lock in the per-export lock hash */
                /* In the function below, .hs_keycmp resolves to
                 * ldlm_export_lock_keycmp() */
@@ -1236,17 +1248,21 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                 if (lock != NULL) {
                         DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
                                   LPX64, lock->l_handle.h_cookie);
+                       flags |= LDLM_FL_RESENT;
                         GOTO(existing_lock, rc = 0);
-                }
+               }
         }
 
-        /* The lock's callback data might be set in the policy function */
-        lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name,
-                                dlm_req->lock_desc.l_resource.lr_type,
-                                dlm_req->lock_desc.l_req_mode,
+       /* The lock's callback data might be set in the policy function */
+       lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name,
+                               dlm_req->lock_desc.l_resource.lr_type,
+                               dlm_req->lock_desc.l_req_mode,
                                cbs, NULL, 0, LVB_T_NONE);
-        if (!lock)
-                GOTO(out, rc = -ENOMEM);
+       if (IS_ERR(lock)) {
+               rc = PTR_ERR(lock);
+               lock = NULL;
+               GOTO(out, rc);
+       }
 
         lock->l_last_activity = cfs_time_current_sec();
         lock->l_remote_handle = dlm_req->lock_handle[0];
@@ -1383,11 +1399,11 @@ existing_lock:
 
         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
-        if (lock) {
-                LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
-                           "(err=%d, rc=%d)", err, rc);
+       if (lock != NULL) {
+               LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
+                          "(err=%d, rc=%d)", err, rc);
 
-                if (rc == 0) {
+               if (rc == 0) {
                        if (req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
                                                  RCL_SERVER) &&
                            ldlm_lvbo_size(lock) > 0) {
@@ -1400,7 +1416,7 @@ existing_lock:
                                         req, lock);
                                buflen = req_capsule_get_size(&req->rq_pill,
                                                &RMF_DLM_LVB, RCL_SERVER);
-                               if (buflen >= 0) {
+                               if (buflen > 0) {
                                        buflen = ldlm_lvbo_fill(lock, buf,
                                                                buflen);
                                        if (buflen >= 0)
@@ -1410,9 +1426,9 @@ existing_lock:
                                                        buflen, RCL_SERVER);
                                        else
                                                rc = buflen;
-                               }
-                               else
+                               } else {
                                        rc = buflen;
+                               }
                        }
                 } else {
                         lock_res_and_lock(lock);
@@ -2487,22 +2503,22 @@ static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
        static unsigned int num_bl = 0;
 
        spin_lock(&blp->blp_lock);
-        /* process a request from the blp_list at least every blp_num_threads */
-        if (!cfs_list_empty(&blp->blp_list) &&
-            (cfs_list_empty(&blp->blp_prio_list) || num_bl == 0))
-                blwi = cfs_list_entry(blp->blp_list.next,
-                                      struct ldlm_bl_work_item, blwi_entry);
-        else
-                if (!cfs_list_empty(&blp->blp_prio_list))
-                        blwi = cfs_list_entry(blp->blp_prio_list.next,
-                                              struct ldlm_bl_work_item,
-                                              blwi_entry);
-
-        if (blwi) {
-                if (++num_bl >= cfs_atomic_read(&blp->blp_num_threads))
-                        num_bl = 0;
-                cfs_list_del(&blwi->blwi_entry);
-        }
+       /* process a request from the blp_list at least every blp_num_threads */
+       if (!cfs_list_empty(&blp->blp_list) &&
+           (cfs_list_empty(&blp->blp_prio_list) || num_bl == 0))
+               blwi = cfs_list_entry(blp->blp_list.next,
+                                     struct ldlm_bl_work_item, blwi_entry);
+       else
+               if (!cfs_list_empty(&blp->blp_prio_list))
+                       blwi = cfs_list_entry(blp->blp_prio_list.next,
+                                             struct ldlm_bl_work_item,
+                                             blwi_entry);
+
+       if (blwi) {
+               if (++num_bl >= atomic_read(&blp->blp_num_threads))
+                       num_bl = 0;
+               cfs_list_del(&blwi->blwi_entry);
+       }
        spin_unlock(&blp->blp_lock);
 
        return blwi;
@@ -2524,13 +2540,13 @@ static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
        struct task_struct *task;
 
        init_completion(&bltd.bltd_comp);
-       bltd.bltd_num = cfs_atomic_read(&blp->blp_num_threads);
+       bltd.bltd_num = atomic_read(&blp->blp_num_threads);
        snprintf(bltd.bltd_name, sizeof(bltd.bltd_name) - 1,
                "ldlm_bl_%02d", bltd.bltd_num);
        task = kthread_run(ldlm_bl_thread_main, &bltd, bltd.bltd_name);
        if (IS_ERR(task)) {
                CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
-                      cfs_atomic_read(&blp->blp_num_threads), PTR_ERR(task));
+                      atomic_read(&blp->blp_num_threads), PTR_ERR(task));
                return PTR_ERR(task);
        }
        wait_for_completion(&bltd.bltd_comp);
@@ -2548,51 +2564,50 @@ static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
 static int ldlm_bl_thread_main(void *arg)
 {
         struct ldlm_bl_pool *blp;
+       struct ldlm_bl_thread_data *bltd = arg;
         ENTRY;
 
-        {
-                struct ldlm_bl_thread_data *bltd = arg;
-
-                blp = bltd->bltd_blp;
+       blp = bltd->bltd_blp;
 
-               cfs_atomic_inc(&blp->blp_num_threads);
-                cfs_atomic_inc(&blp->blp_busy_threads);
-
-               complete(&bltd->bltd_comp);
-                /* cannot use bltd after this, it is only on caller's stack */
-        }
+       atomic_inc(&blp->blp_num_threads);
+       atomic_inc(&blp->blp_busy_threads);
 
-        while (1) {
-                struct l_wait_info lwi = { 0 };
-                struct ldlm_bl_work_item *blwi = NULL;
-                int busy;
+       complete(&bltd->bltd_comp);
+       /* cannot use bltd after this, it is only on caller's stack */
 
-                blwi = ldlm_bl_get_work(blp);
-
-                if (blwi == NULL) {
-                        cfs_atomic_dec(&blp->blp_busy_threads);
-                        l_wait_event_exclusive(blp->blp_waitq,
-                                         (blwi = ldlm_bl_get_work(blp)) != NULL,
-                                         &lwi);
-                        busy = cfs_atomic_inc_return(&blp->blp_busy_threads);
-                } else {
-                        busy = cfs_atomic_read(&blp->blp_busy_threads);
-                }
+       while (1) {
+               struct l_wait_info lwi = { 0 };
+               struct ldlm_bl_work_item *blwi = NULL;
+               int busy;
+
+               blwi = ldlm_bl_get_work(blp);
+
+               if (blwi == NULL) {
+                       atomic_dec(&blp->blp_busy_threads);
+                       l_wait_event_exclusive(blp->blp_waitq,
+                                        (blwi = ldlm_bl_get_work(blp)) != NULL,
+                                        &lwi);
+                       busy = atomic_inc_return(&blp->blp_busy_threads);
+               } else {
+                       busy = atomic_read(&blp->blp_busy_threads);
+               }
 
-                if (blwi->blwi_ns == NULL)
-                        /* added by ldlm_cleanup() */
-                        break;
+               if (blwi->blwi_ns == NULL)
+                       /* added by ldlm_cleanup() */
+                       break;
 
-                /* Not fatal if racy and have a few too many threads */
-                if (unlikely(busy < blp->blp_max_threads &&
-                             busy >= cfs_atomic_read(&blp->blp_num_threads) &&
-                             !blwi->blwi_mem_pressure))
-                        /* discard the return value, we tried */
-                        ldlm_bl_thread_start(blp);
+               /* Not fatal if racy and have a few too many threads */
+               if (unlikely(busy < blp->blp_max_threads &&
+                            busy >= atomic_read(&blp->blp_num_threads) &&
+                            !blwi->blwi_mem_pressure))
+                       /* discard the return value, we tried */
+                       ldlm_bl_thread_start(blp);
 
                 if (blwi->blwi_mem_pressure)
                        memory_pressure_set();
 
+               OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
+
                 if (blwi->blwi_count) {
                         int count;
                        /* The special case when we cancel locks in LRU
@@ -2615,12 +2630,12 @@ static int ldlm_bl_thread_main(void *arg)
                        OBD_FREE(blwi, sizeof(*blwi));
                else
                        complete(&blwi->blwi_comp);
-        }
+       }
 
-        cfs_atomic_dec(&blp->blp_busy_threads);
-        cfs_atomic_dec(&blp->blp_num_threads);
+       atomic_dec(&blp->blp_busy_threads);
+       atomic_dec(&blp->blp_num_threads);
        complete(&blp->blp_comp);
-        RETURN(0);
+       RETURN(0);
 }
 
 #endif
@@ -2774,11 +2789,15 @@ EXPORT_SYMBOL(ldlm_destroy_export);
 static int ldlm_setup(void)
 {
        static struct ptlrpc_service_conf       conf;
-       struct ldlm_bl_pool                     *blp = NULL;
-        int rc = 0;
+       struct ldlm_bl_pool                    *blp = NULL;
 #ifdef __KERNEL__
-        int i;
+# ifdef HAVE_SERVER_SUPPORT
+       struct task_struct *task;
+# endif
+       int i;
 #endif
+       int rc = 0;
+
         ENTRY;
 
         if (ldlm_state != NULL)
@@ -2885,8 +2904,8 @@ static int ldlm_setup(void)
        CFS_INIT_LIST_HEAD(&blp->blp_list);
        CFS_INIT_LIST_HEAD(&blp->blp_prio_list);
        init_waitqueue_head(&blp->blp_waitq);
-       cfs_atomic_set(&blp->blp_num_threads, 0);
-       cfs_atomic_set(&blp->blp_busy_threads, 0);
+       atomic_set(&blp->blp_num_threads, 0);
+       atomic_set(&blp->blp_busy_threads, 0);
 
 #ifdef __KERNEL__
        if (ldlm_num_threads == 0) {
@@ -2913,8 +2932,9 @@ static int ldlm_setup(void)
        spin_lock_init(&waiting_locks_spinlock);
        cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
 
-       rc = PTR_ERR(kthread_run(expired_lock_main, NULL, "ldlm_elt"));
-       if (IS_ERR_VALUE(rc)) {
+       task = kthread_run(expired_lock_main, NULL, "ldlm_elt");
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
                CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
                GOTO(out, rc);
        }
@@ -2954,7 +2974,7 @@ static int ldlm_cleanup(void)
        if (ldlm_state->ldlm_bl_pool != NULL) {
                struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
 
-               while (cfs_atomic_read(&blp->blp_num_threads) > 0) {
+               while (atomic_read(&blp->blp_num_threads) > 0) {
                        struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
 
                        init_completion(&blp->blp_comp);