Whamcloud - gitweb
LU-2835 ptlrpc: Fix race during exp_flock_hash creation
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 2af67f2..1cfd813 100644 (file)
@@ -60,8 +60,8 @@ static char *ldlm_cpts;
 CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444,
                "CPU partitions ldlm threads should run on");
 
-extern cfs_mem_cache_t *ldlm_resource_slab;
-extern cfs_mem_cache_t *ldlm_lock_slab;
+extern struct kmem_cache *ldlm_resource_slab;
+extern struct kmem_cache *ldlm_lock_slab;
 static struct mutex    ldlm_ref_mutex;
 static int ldlm_refcount;
 
@@ -178,7 +178,6 @@ static int expired_lock_main(void *arg)
         int do_dump;
 
         ENTRY;
-        cfs_daemonize("ldlm_elt");
 
         expired_lock_thread.elt_state = ELT_READY;
         cfs_waitq_signal(&expired_lock_thread.elt_waitq);
@@ -213,14 +212,15 @@ static int expired_lock_main(void *arg)
 
                         lock = cfs_list_entry(expired->next, struct ldlm_lock,
                                           l_pending_chain);
-                        if ((void *)lock < LP_POISON + CFS_PAGE_SIZE &&
-                            (void *)lock >= LP_POISON) {
+                       if ((void *)lock < LP_POISON + PAGE_CACHE_SIZE &&
+                           (void *)lock >= LP_POISON) {
                                spin_unlock_bh(&waiting_locks_spinlock);
-                                CERROR("free lock on elt list %p\n", lock);
-                                LBUG();
-                        }
-                        cfs_list_del_init(&lock->l_pending_chain);
-                        if ((void *)lock->l_export < LP_POISON + CFS_PAGE_SIZE &&
+                               CERROR("free lock on elt list %p\n", lock);
+                               LBUG();
+                       }
+                       cfs_list_del_init(&lock->l_pending_chain);
+                       if ((void *)lock->l_export <
+                            LP_POISON + PAGE_CACHE_SIZE &&
                             (void *)lock->l_export >= LP_POISON) {
                                 CERROR("lock with free export on elt list %p\n",
                                        lock->l_export);
@@ -233,7 +233,7 @@ static int expired_lock_main(void *arg)
                                 continue;
                         }
 
-                       if (lock->l_destroyed) {
+                       if (lock->l_flags & LDLM_FL_DESTROYED) {
                                /* release the lock refcount where
                                 * waiting_locks_callback() founds */
                                LDLM_LOCK_RELEASE(lock);
@@ -324,7 +324,7 @@ static void waiting_locks_callback(unsigned long unused)
                                    libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
 
                         cfs_list_del_init(&lock->l_pending_chain);
-                       if (lock->l_destroyed) {
+                       if (lock->l_flags & LDLM_FL_DESTROYED) {
                                /* relay the lock refcount decrease to
                                 * expired lock thread */
                                cfs_list_add(&lock->l_pending_chain,
@@ -347,7 +347,7 @@ static void waiting_locks_callback(unsigned long unused)
                                    libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
 
                         cfs_list_del_init(&lock->l_pending_chain);
-                       if (lock->l_destroyed) {
+                       if (lock->l_flags & LDLM_FL_DESTROYED) {
                                /* relay the lock refcount decrease to
                                 * expired lock thread */
                                cfs_list_add(&lock->l_pending_chain,
@@ -468,13 +468,13 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
        int timeout = ldlm_get_enq_timeout(lock);
 
        /* NB: must be called with hold of lock_res_and_lock() */
-       LASSERT(lock->l_res_locked);
-       lock->l_waited = 1;
+       LASSERT(lock->l_flags & LDLM_FL_RES_LOCKED);
+       lock->l_flags |= LDLM_FL_WAITED;
 
        LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
 
        spin_lock_bh(&waiting_locks_spinlock);
-       if (lock->l_destroyed) {
+       if (lock->l_flags & LDLM_FL_DESTROYED) {
                static cfs_time_t next;
                spin_unlock_bh(&waiting_locks_spinlock);
                 LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
@@ -619,8 +619,8 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
 # ifdef HAVE_SERVER_SUPPORT
 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
 {
-       LASSERT(lock->l_res_locked);
-       LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
+       LASSERT((lock->l_flags & (LDLM_FL_RES_LOCKED|LDLM_FL_CANCEL_ON_BLOCK))
+               == LDLM_FL_RES_LOCKED);
        RETURN(1);
 }
 
@@ -878,7 +878,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                RETURN(0);
        }
 
-       if (lock->l_destroyed) {
+       if (lock->l_flags & LDLM_FL_DESTROYED) {
                /* What's the point? */
                unlock_res_and_lock(lock);
                ptlrpc_req_finished(req);
@@ -1731,22 +1731,22 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                                     struct ldlm_lock *lock)
 {
        int lvb_len;
-        CFS_LIST_HEAD(ast_list);
+       CFS_LIST_HEAD(ast_list);
        int rc = 0;
-        ENTRY;
+       ENTRY;
 
-        LDLM_DEBUG(lock, "client completion callback handler START");
+       LDLM_DEBUG(lock, "client completion callback handler START");
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
-                int to = cfs_time_seconds(1);
-                while (to > 0) {
-                        cfs_schedule_timeout_and_set_state(
-                                CFS_TASK_INTERRUPTIBLE, to);
-                        if (lock->l_granted_mode == lock->l_req_mode ||
-                            lock->l_destroyed)
-                                break;
-                }
-        }
+       if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
+               int to = cfs_time_seconds(1);
+               while (to > 0) {
+                       cfs_schedule_timeout_and_set_state(
+                               CFS_TASK_INTERRUPTIBLE, to);
+                       if (lock->l_granted_mode == lock->l_req_mode ||
+                           lock->l_flags & LDLM_FL_DESTROYED)
+                               break;
+               }
+       }
 
        lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
        if (lvb_len < 0) {
@@ -1782,29 +1782,29 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                }
        }
 
-        lock_res_and_lock(lock);
-        if (lock->l_destroyed ||
-            lock->l_granted_mode == lock->l_req_mode) {
-                /* bug 11300: the lock has already been granted */
-                unlock_res_and_lock(lock);
-                LDLM_DEBUG(lock, "Double grant race happened");
+       lock_res_and_lock(lock);
+       if ((lock->l_flags & LDLM_FL_DESTROYED) ||
+           lock->l_granted_mode == lock->l_req_mode) {
+               /* bug 11300: the lock has already been granted */
+               unlock_res_and_lock(lock);
+               LDLM_DEBUG(lock, "Double grant race happened");
                GOTO(out, rc = 0);
-        }
+       }
 
-        /* If we receive the completion AST before the actual enqueue returned,
-         * then we might need to switch lock modes, resources, or extents. */
-        if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
-                lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
-                LDLM_DEBUG(lock, "completion AST, new lock mode");
-        }
+       /* If we receive the completion AST before the actual enqueue returned,
+        * then we might need to switch lock modes, resources, or extents. */
+       if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
+               lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
+               LDLM_DEBUG(lock, "completion AST, new lock mode");
+       }
 
-        if (lock->l_resource->lr_type != LDLM_PLAIN) {
-                ldlm_convert_policy_to_local(req->rq_export,
-                                          dlm_req->lock_desc.l_resource.lr_type,
-                                          &dlm_req->lock_desc.l_policy_data,
-                                          &lock->l_policy_data);
-                LDLM_DEBUG(lock, "completion AST, new policy data");
-        }
+       if (lock->l_resource->lr_type != LDLM_PLAIN) {
+               ldlm_convert_policy_to_local(req->rq_export,
+                                         dlm_req->lock_desc.l_resource.lr_type,
+                                         &dlm_req->lock_desc.l_policy_data,
+                                         &lock->l_policy_data);
+               LDLM_DEBUG(lock, "completion AST, new policy data");
+       }
 
         ldlm_resource_unlink_lock(lock);
         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
@@ -1961,7 +1961,7 @@ static inline void init_blwi(struct ldlm_bl_work_item *blwi,
        init_completion(&blwi->blwi_comp);
         CFS_INIT_LIST_HEAD(&blwi->blwi_head);
 
-        if (cfs_memory_pressure_get())
+       if (memory_pressure_get())
                 blwi->blwi_mem_pressure = 1;
 
         blwi->blwi_ns = ns;
@@ -2163,16 +2163,6 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 rc = ldlm_handle_setinfo(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
-        case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
-                CERROR("shouldn't be handling OBD_LOG_CANCEL on DLM thread\n");
-                req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
-                        RETURN(0);
-                rc = llog_origin_handle_cancel(req);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
-                        RETURN(0);
-                ldlm_callback_reply(req, rc);
-                RETURN(0);
         case LLOG_ORIGIN_HANDLE_CREATE:
                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
@@ -2374,15 +2364,6 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
                 if (rc)
                         break;
                 RETURN(0);
-        case OBD_LOG_CANCEL:
-                req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
-                        RETURN(0);
-                rc = llog_origin_handle_cancel(req);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
-                        RETURN(0);
-                ldlm_callback_reply(req, rc);
-                RETURN(0);
         default:
                 CERROR("invalid opcode %d\n",
                        lustre_msg_get_opc(req->rq_reqmsg));
@@ -2582,14 +2563,17 @@ static int ldlm_bl_thread_main(void *arg);
 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
 {
        struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
-       int rc;
+       cfs_task_t *task;
 
        init_completion(&bltd.bltd_comp);
-       rc = cfs_create_thread(ldlm_bl_thread_main, &bltd, 0);
-       if (rc < 0) {
-               CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
-                      cfs_atomic_read(&blp->blp_num_threads), rc);
-               return rc;
+       bltd.bltd_num = cfs_atomic_read(&blp->blp_num_threads);
+       snprintf(bltd.bltd_name, sizeof(bltd.bltd_name) - 1,
+               "ldlm_bl_%02d", bltd.bltd_num);
+       task = kthread_run(ldlm_bl_thread_main, &bltd, bltd.bltd_name);
+       if (IS_ERR(task)) {
+               CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
+                      cfs_atomic_read(&blp->blp_num_threads), PTR_ERR(task));
+               return PTR_ERR(task);
        }
        wait_for_completion(&bltd.bltd_comp);
 
@@ -2613,14 +2597,9 @@ static int ldlm_bl_thread_main(void *arg)
 
                 blp = bltd->bltd_blp;
 
-                bltd->bltd_num =
-                        cfs_atomic_inc_return(&blp->blp_num_threads) - 1;
+               cfs_atomic_inc(&blp->blp_num_threads);
                 cfs_atomic_inc(&blp->blp_busy_threads);
 
-                snprintf(bltd->bltd_name, sizeof(bltd->bltd_name) - 1,
-                        "ldlm_bl_%02d", bltd->bltd_num);
-                cfs_daemonize(bltd->bltd_name);
-
                complete(&bltd->bltd_comp);
                 /* cannot use bltd after this, it is only on caller's stack */
         }
@@ -2654,7 +2633,7 @@ static int ldlm_bl_thread_main(void *arg)
                         ldlm_bl_thread_start(blp);
 
                 if (blwi->blwi_mem_pressure)
-                        cfs_memory_pressure_set();
+                       memory_pressure_set();
 
                 if (blwi->blwi_count) {
                         int count;
@@ -2672,7 +2651,7 @@ static int ldlm_bl_thread_main(void *arg)
                                                 blwi->blwi_lock);
                 }
                 if (blwi->blwi_mem_pressure)
-                        cfs_memory_pressure_clr();
+                       memory_pressure_clr();
 
                if (blwi->blwi_flags & LCF_ASYNC)
                        OBD_FREE(blwi, sizeof(*blwi));
@@ -2796,6 +2775,7 @@ static cfs_hash_ops_t ldlm_export_lock_ops = {
 
 int ldlm_init_export(struct obd_export *exp)
 {
+       int rc;
         ENTRY;
 
         exp->exp_lock_hash =
@@ -2811,7 +2791,14 @@ int ldlm_init_export(struct obd_export *exp)
         if (!exp->exp_lock_hash)
                 RETURN(-ENOMEM);
 
+       rc = ldlm_init_flock_export(exp);
+       if (rc)
+               GOTO(err, rc);
+
         RETURN(0);
+err:
+       ldlm_destroy_export(exp);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(ldlm_init_export);
 
@@ -2960,22 +2947,22 @@ static int ldlm_setup(void)
        }
 
 # ifdef HAVE_SERVER_SUPPORT
-        CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
-        expired_lock_thread.elt_state = ELT_STOPPED;
-        cfs_waitq_init(&expired_lock_thread.elt_waitq);
+       CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
+       expired_lock_thread.elt_state = ELT_STOPPED;
+       cfs_waitq_init(&expired_lock_thread.elt_waitq);
 
-        CFS_INIT_LIST_HEAD(&waiting_locks_list);
+       CFS_INIT_LIST_HEAD(&waiting_locks_list);
        spin_lock_init(&waiting_locks_spinlock);
-        cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
+       cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
 
-        rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS);
-       if (rc < 0) {
+       rc = PTR_ERR(kthread_run(expired_lock_main, NULL, "ldlm_elt"));
+       if (IS_ERR_VALUE(rc)) {
                CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
                GOTO(out, rc);
        }
 
-        cfs_wait_event(expired_lock_thread.elt_waitq,
-                       expired_lock_thread.elt_state == ELT_READY);
+       cfs_wait_event(expired_lock_thread.elt_waitq,
+                      expired_lock_thread.elt_state == ELT_READY);
 # endif /* HAVE_SERVER_SUPPORT */
 
        rc = ldlm_pools_init();
@@ -3057,26 +3044,26 @@ int ldlm_init(void)
        mutex_init(&ldlm_ref_mutex);
        mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
        mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
-        ldlm_resource_slab = cfs_mem_cache_create("ldlm_resources",
-                                               sizeof(struct ldlm_resource), 0,
-                                               CFS_SLAB_HWCACHE_ALIGN);
-        if (ldlm_resource_slab == NULL)
-                return -ENOMEM;
+       ldlm_resource_slab = kmem_cache_create("ldlm_resources",
+                                              sizeof(struct ldlm_resource), 0,
+                                              SLAB_HWCACHE_ALIGN, NULL);
+       if (ldlm_resource_slab == NULL)
+               return -ENOMEM;
 
-       ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
+       ldlm_lock_slab = kmem_cache_create("ldlm_locks",
                              sizeof(struct ldlm_lock), 0,
-                             CFS_SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU);
+                             SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
        if (ldlm_lock_slab == NULL) {
-               cfs_mem_cache_destroy(ldlm_resource_slab);
+               kmem_cache_destroy(ldlm_resource_slab);
                return -ENOMEM;
        }
 
-        ldlm_interval_slab = cfs_mem_cache_create("interval_node",
+       ldlm_interval_slab = kmem_cache_create("interval_node",
                                         sizeof(struct ldlm_interval),
-                                        0, CFS_SLAB_HWCACHE_ALIGN);
+                                       0, SLAB_HWCACHE_ALIGN, NULL);
         if (ldlm_interval_slab == NULL) {
-                cfs_mem_cache_destroy(ldlm_resource_slab);
-                cfs_mem_cache_destroy(ldlm_lock_slab);
+               kmem_cache_destroy(ldlm_resource_slab);
+               kmem_cache_destroy(ldlm_lock_slab);
                 return -ENOMEM;
         }
 #if LUSTRE_TRACKS_LOCK_EXP_REFS
@@ -3087,19 +3074,15 @@ int ldlm_init(void)
 
 void ldlm_exit(void)
 {
-        int rc;
-        if (ldlm_refcount)
-                CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
-        rc = cfs_mem_cache_destroy(ldlm_resource_slab);
-        LASSERTF(rc == 0, "couldn't free ldlm resource slab\n");
+       if (ldlm_refcount)
+               CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
+       kmem_cache_destroy(ldlm_resource_slab);
 #ifdef __KERNEL__
-        /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
-         * synchronize_rcu() to wait a grace period elapsed, so that
-         * ldlm_lock_free() get a chance to be called. */
-        synchronize_rcu();
+       /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
+        * synchronize_rcu() to wait a grace period elapsed, so that
+        * ldlm_lock_free() get a chance to be called. */
+       synchronize_rcu();
 #endif
-        rc = cfs_mem_cache_destroy(ldlm_lock_slab);
-        LASSERTF(rc == 0, "couldn't free ldlm lock slab\n");
-        rc = cfs_mem_cache_destroy(ldlm_interval_slab);
-        LASSERTF(rc == 0, "couldn't free interval node slab\n");
+       kmem_cache_destroy(ldlm_lock_slab);
+       kmem_cache_destroy(ldlm_interval_slab);
 }