Whamcloud - gitweb
Added ll_recover_lost_found_objs to .cvsignore
[fs/lustre-release.git] / lustre / obdclass / cl_lock.c
index b75685f..d049754 100644 (file)
@@ -194,12 +194,18 @@ EXPORT_SYMBOL(cl_lock_slice_add);
  */
 int cl_lock_mode_match(enum cl_lock_mode has, enum cl_lock_mode need)
 {
-        LINVRNT(need == CLM_READ || need == CLM_WRITE || need == CLM_PHANTOM);
-        LINVRNT(has == CLM_READ || has == CLM_WRITE || has == CLM_PHANTOM);
+        LINVRNT(need == CLM_READ || need == CLM_WRITE ||
+                need == CLM_PHANTOM || need == CLM_GROUP);
+        LINVRNT(has == CLM_READ || has == CLM_WRITE ||
+                has == CLM_PHANTOM || has == CLM_GROUP);
         CLASSERT(CLM_PHANTOM < CLM_READ);
         CLASSERT(CLM_READ < CLM_WRITE);
+        CLASSERT(CLM_WRITE < CLM_GROUP);
 
-        return need <= has;
+        if (has != CLM_GROUP)
+                return need <= has;
+        else
+                return need == has;
 }
 EXPORT_SYMBOL(cl_lock_mode_match);
 
@@ -212,7 +218,8 @@ int cl_lock_ext_match(const struct cl_lock_descr *has,
         return
                 has->cld_start <= need->cld_start &&
                 has->cld_end >= need->cld_end &&
-                cl_lock_mode_match(has->cld_mode, need->cld_mode);
+                cl_lock_mode_match(has->cld_mode, need->cld_mode) &&
+                (has->cld_mode != CLM_GROUP || has->cld_gid == need->cld_gid);
 }
 EXPORT_SYMBOL(cl_lock_ext_match);
 
@@ -535,6 +542,7 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
                 if (ok) {
                         cl_lock_hold_add(env, lock, scope, source);
                         cl_lock_user_add(env, lock);
+                        cl_lock_put(env, lock);
                 }
                 cl_lock_mutex_put(env, lock);
                 if (!ok) {
@@ -755,8 +763,17 @@ static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
                  * and head->coh_nesting == 1 check assumes two level top-sub
                  * hierarchy.
                  */
-                LASSERT(ergo(head->coh_nesting == 1 &&
-                             list_empty(&head->coh_locks), !head->coh_pages));
+                /*
+                 * The count of pages of this object may NOT be zero because
+                 * we don't cleanup the pages if they are in CPS_FREEING state.
+                 * See cl_page_gang_lookup().
+                 *
+                 * It is safe to leave the CPS_FREEING pages in cache w/o
+                 * a lock, because those page must not be uptodate.
+                 * See cl_page_delete0 for details.
+                 */
+                /* LASSERT(!ergo(head->coh_nesting == 1 &&
+                           list_empty(&head->coh_locks), !head->coh_pages)); */
                 spin_unlock(&head->coh_lock_guard);
                 /*
                  * From now on, no new references to this lock can be acquired
@@ -831,10 +848,11 @@ static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
         lu_ref_del(&lock->cll_holders, scope, source);
         cl_lock_hold_mod(env, lock, -1);
         if (lock->cll_holds == 0) {
-                if (lock->cll_descr.cld_mode == CLM_PHANTOM)
+                if (lock->cll_descr.cld_mode == CLM_PHANTOM ||
+                    lock->cll_descr.cld_mode == CLM_GROUP)
                         /*
-                         * If lock is still phantom when user is done with
-                         * it---destroy the lock.
+                         * If lock is still phantom or grouplock when user is
+                         * done with it---destroy the lock.
                          */
                         lock->cll_flags |= CLF_CANCELPEND|CLF_DOOMED;
                 if (lock->cll_flags & CLF_CANCELPEND) {
@@ -1672,6 +1690,7 @@ struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj,
         list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
                 if (scan != except &&
                     cl_lock_ext_match(&scan->cll_descr, need) &&
+                    scan->cll_state >= CLS_HELD &&
                     scan->cll_state < CLS_FREEING &&
                     /*
                      * This check is racy as the lock can be canceled right
@@ -1785,9 +1804,8 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
         struct cl_io          *io    = &info->clt_io;
         struct cl_2queue      *queue = &info->clt_queue;
         struct cl_lock_descr  *descr = &lock->cll_descr;
-        int                      result;
-        int                      rc0;
-        int                      rc1;
+        long page_count;
+        int result;
 
         LINVRNT(cl_lock_invariant(env, lock));
         ENTRY;
@@ -1795,24 +1813,38 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
         io->ci_obj = cl_object_top(descr->cld_obj);
         result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
         if (result == 0) {
+                int nonblock = 1;
 
+restart:
                 cl_2queue_init(queue);
                 cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
-                                    descr->cld_end, &queue->c2_qin);
-                if (queue->c2_qin.pl_nr > 0) {
+                                    descr->cld_end, &queue->c2_qin, nonblock);
+                page_count = queue->c2_qin.pl_nr;
+                if (page_count > 0) {
                         result = cl_page_list_unmap(env, io, &queue->c2_qin);
                         if (!discard) {
-                                rc0 = cl_io_submit_rw(env, io, CRT_WRITE,
-                                                      queue, CRP_CANCEL);
-                                rc1 = cl_page_list_own(env, io,
-                                                       &queue->c2_qout);
-                                result = result ?: rc0 ?: rc1;
+                                long timeout = 600; /* 10 minutes. */
+                                /* for debug purpose, if this request can't be
+                                 * finished in 10 minutes, we hope it can
+                                 * notify us.
+                                 */
+                                result = cl_io_submit_sync(env, io, CRT_WRITE,
+                                                           queue, CRP_CANCEL,
+                                                           timeout);
+                                if (result)
+                                        CWARN("Writing %lu pages error: %d\n",
+                                              page_count, result);
                         }
                         cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
                         cl_2queue_discard(env, io, queue);
                         cl_2queue_disown(env, io, queue);
                 }
                 cl_2queue_fini(env, queue);
+
+                if (nonblock) {
+                        nonblock = 0;
+                        goto restart;
+                }
         }
         cl_io_fini(env, io);
         RETURN(result);
@@ -2077,7 +2109,8 @@ const char *cl_lock_mode_name(const enum cl_lock_mode mode)
         static const char *names[] = {
                 [CLM_PHANTOM] = "PHANTOM",
                 [CLM_READ]    = "READ",
-                [CLM_WRITE]   = "WRITE"
+                [CLM_WRITE]   = "WRITE",
+                [CLM_GROUP]   = "GROUP"
         };
         if (0 <= mode && mode < ARRAY_SIZE(names))
                 return names[mode];