Whamcloud - gitweb
- landed share_write_lock.patch from #7410
authoryury <yury>
Thu, 8 Sep 2005 08:34:57 +0000 (08:34 +0000)
committeryury <yury>
Thu, 8 Sep 2005 08:34:57 +0000 (08:34 +0000)
lustre/cmobd/cm_oss_reint.c
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_resource.c
lustre/lmv/lmv_obd.c
lustre/lov/lov_obd.c
lustre/lov/lov_request.c
lustre/osc/osc_request.c

index 4e4e630..c460ffb 100644 (file)
@@ -217,8 +217,7 @@ static int cmobd_write_extents(struct obd_device *obd, struct obdo *oa,
         /* XXX for debug write replay without smfs and kml */
         res_id.name[0]= oa->o_id;
         res_id.name[1]= oa->o_gr;
-        policy.l_extent.start = extent->start;
-        policy.l_extent.end = extent->end;
+        policy.l_extent = *extent;
         
         /* get extent read lock on the source replay file */
         rc = ldlm_cli_enqueue(NULL, NULL, cache->obd_namespace, res_id,
index 391d493..25f4ae7 100644 (file)
@@ -134,6 +134,9 @@ static void ldlm_extent_policy(struct ldlm_resource *res,
 {
         struct ldlm_extent new_ex = { .start = 0, .end = ~0};
 
+        if (lock->l_req_mode == LCK_GROUP)
+                return;
+
         ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex);
         ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex);
 
@@ -156,139 +159,126 @@ static void ldlm_extent_policy(struct ldlm_resource *res,
  */
 static int
 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
-                         int *flags, ldlm_error_t *err,
-                         struct list_head *work_list)
+                         int *flags, struct list_head *work_list,
+                         struct list_head **insertp)
 {
         struct list_head *tmp;
         struct ldlm_lock *lock;
         ldlm_mode_t req_mode = req->l_req_mode;
-        __u64 req_start = req->l_req_extent.start;
-        __u64 req_end = req->l_req_extent.end;
         int compat = 1;
-        int scan = 0;
         ENTRY;
 
         lockmode_verify(req_mode);
 
-        list_for_each(tmp, queue) {
-                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+        if (req->l_req_mode == LCK_GROUP) {
+                int found = 0;
 
-                if (req == lock)
-                        RETURN(compat);
-
-                if (scan) {
-                        /* We only get here if we are queuing GROUP lock
-                           and met some incompatible one. The main idea of this
-                           code is to insert GROUP lock past compatible GROUP
-                           lock in the waiting queue or if there is not any,
-                           then in front of first non-GROUP lock */
-                        if (lock->l_req_mode != LCK_GROUP) {
-                        /* Ok, we hit non-GROUP lock, there should be no
-                           more GROUP locks later on, queue in front of
-                           first non-GROUP lock */
-
-                                ldlm_resource_insert_lock_after(lock, req);
-                                list_del_init(&lock->l_res_link);
-                                ldlm_resource_insert_lock_after(req, lock);
-                                RETURN(0);
-                        }
-                        if (req->l_policy_data.l_extent.gid ==
-                             lock->l_policy_data.l_extent.gid) {
-                                /* found it */
-                                ldlm_resource_insert_lock_after(lock,
-                                                                req);
-                                RETURN(0);
-                        }
-                        continue;
-                }
+                list_for_each(tmp, queue) {
+                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+                        if (req == lock)
+                                break;
 
-                /* locks are compatible, overlap doesn't matter */
-                if (lockmode_compat(lock->l_req_mode, req_mode)) {
-                        /* non-group locks are compatible, overlap doesn't
-                           matter */
-                        if (req_mode != LCK_GROUP)
+                        if (lock->l_req_mode != LCK_GROUP) {
+                                /* group locks are not blocked by waiting PR|PW
+                                 * locks. also, any group locks that could have
+                                 * blocked us would have appeared before this
+                                 * PR|PW lock in the list.  */
+                                if (lock->l_req_mode != lock->l_granted_mode)
+                                        break;
+
+                                compat = 0;
+                                if (!work_list)
+                                        break;
+
+                                if (lock->l_blocking_ast)
+                                        ldlm_add_ast_work_item(lock, req,
+                                                               work_list);
                                 continue;
-                                
-                        /* If we are trying to get a GROUP lock and there is
-                           another one of this kind, we need to compare gid */
-                        if (req->l_policy_data.l_extent.gid ==
-                            lock->l_policy_data.l_extent.gid) {
-                                if (lock->l_req_mode == lock->l_granted_mode)
-                                        RETURN(2);
-
-                                /* If we are in nonblocking mode - return
-                                   immediately */
-                                if (*flags & LDLM_FL_BLOCK_NOWAIT) {
-                                        compat = -EWOULDBLOCK;
-                                        goto destroylock;
-                                }
-                                /* If this group lock is compatible with another
-                                 * group lock on the waiting list, they must be
-                                 * together in the list, so they can be granted
-                                 * at the same time.  Otherwise the later lock
-                                 * can get stuck behind another, incompatible,
-                                 * lock. */
-                                ldlm_resource_insert_lock_after(lock, req);
-                                /* Because 'lock' is not granted, we can stop
-                                 * processing this queue and return immediately.
-                                 * There is no need to check the rest of the
-                                 * list. */
-                                RETURN(0);
                         }
-                }
 
-                if (req_mode == LCK_GROUP &&
-                    (lock->l_req_mode != lock->l_granted_mode)) {
-                        scan = 1;
-                        compat = 0;
-                        if (lock->l_req_mode != LCK_GROUP) {
-                        /* Ok, we hit non-GROUP lock, there should be no
-                           more GROUP locks later on, queue in front of
-                           first non-GROUP lock */
-
-                                ldlm_resource_insert_lock_after(lock, req);
-                                list_del_init(&lock->l_res_link);
-                                ldlm_resource_insert_lock_after(req, lock);
-                                RETURN(0);
+                        /* no blocking ASTs are sent for group locks. */
+
+                        if (lock->l_policy_data.l_extent.gid !=
+                            req->l_policy_data.l_extent.gid) {
+                                if (*flags & LDLM_FL_BLOCK_NOWAIT)
+                                        RETURN(-EWOULDBLOCK);
+
+                                compat = 0;
+
+                                /* there's a blocking group lock in front
+                                 * of us on the queue.  It can be held
+                                 * indefinitely, so don't timeout. */
+                                *flags |= LDLM_FL_NO_TIMEOUT;
+
+                                /* the only reason to continue traversing the
+                                 * list at this point is to find the proper
+                                 * place to insert the lock in the waitq. */
+                                if (insertp && !found)
+                                        continue;
+
+                                break;
                         }
-                        if (req->l_policy_data.l_extent.gid ==
-                             lock->l_policy_data.l_extent.gid) {
-                                /* found it */
-                                ldlm_resource_insert_lock_after(lock, req);
-                                RETURN(0);
+
+                        /* if a group lock with this gid has already been
+                         * granted then grant this one. */
+                        if (lock->l_req_mode == lock->l_granted_mode) {
+                                compat = 2;
+                                break;
                         }
-                        continue;
+                        found = 1;
                 }
+        } else {
+                __u64 req_start = req->l_req_extent.start;
+                __u64 req_end = req->l_req_extent.end;
+
+                list_for_each(tmp, queue) {
+                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+                        if (req == lock)
+                                break;
 
-                if (lock->l_req_mode == LCK_GROUP) {
-                        /* If compared lock is GROUP, then requested is PR/PW/=>
-                         * this is not compatible; extent range does not
-                         * matter */
-                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
-                                compat = -EWOULDBLOCK;
-                                goto destroylock;
-                        } else {
+                        if (lock->l_req_mode == LCK_GROUP) {
+                                if (*flags & LDLM_FL_BLOCK_NOWAIT)
+                                        RETURN(-EWOULDBLOCK);
+
+                                /* No blocking ASTs are sent for group locks. */
+                                compat = 0;
+
+                                /* there's a blocking group lock in front
+                                 * of us on the queue.  It can be held
+                                 * indefinitely, so don't timeout. */
                                 *flags |= LDLM_FL_NO_TIMEOUT;
+
+                                if (work_list)
+                                        continue;
+                                else
+                                        break;
                         }
-                } else if (lock->l_policy_data.l_extent.end < req_start ||
-                           lock->l_policy_data.l_extent.start > req_end) {
-                        /* if a non grouplock doesn't overlap skip it */
-                        continue;
-                }
 
-                if (!work_list)
-                        RETURN(0);
+                        /* locks are compatible, overlap doesn't matter */
+                        if (lockmode_compat(lock->l_req_mode, req_mode))
+                                continue;
+
+                        if (lock->l_policy_data.l_extent.end < req_start ||
+                            lock->l_policy_data.l_extent.start > req_end)
+                                continue;
 
-                compat = 0;
-                if (lock->l_blocking_ast)
-                        ldlm_add_ast_work_item(lock, req, work_list);
+                        compat = 0;
+
+                        if (!work_list)
+                                break;
+
+                        if (lock->l_blocking_ast)
+                                ldlm_add_ast_work_item(lock, req, work_list);
+                }
+
+                /* Ensure we scan entire list if insertp is requested so that
+                 * the new request will be appended to the end of the list. */
+                LASSERT((insertp == NULL) || (tmp == queue));
         }
 
-        return(compat);
-destroylock:
-        list_del_init(&req->l_res_link);
-        ldlm_lock_destroy(req);
-        *err = compat;
+        if (insertp)
+                *insertp = tmp;
+
         RETURN(compat);
 }
 
@@ -306,6 +296,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
 {
         struct ldlm_resource *res = lock->l_resource;
         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+        struct list_head *insertp = NULL;
         int rc, rc2;
         ENTRY;
 
@@ -313,17 +304,16 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
         *err = ELDLM_OK;
 
         if (!first_enq) {
-                /* Careful observers will note that we don't handle -EWOULDBLOCK
-                 * here, but it's ok for a non-obvious reason -- compat_queue
-                 * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
-                 * flags should always be zero here, and if that ever stops
-                 * being true, we want to find out. */
+                /* -EWOULDBLOCK can't occur here since (flags & BLOCK_NOWAIT)
+                 * lock requests would either be granted or fail on their
+                 * first_enq. flags should always be zero here, and if that
+                 * ever changes we want to find out. */
                 LASSERT(*flags == 0);
-                rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
-                                              err, NULL);
+                rc = ldlm_extent_compat_queue(&res->lr_granted, lock,
+                                              flags, NULL, NULL);
                 if (rc == 1) {
                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
-                                                      flags, err, NULL);
+                                                      flags, NULL, NULL);
                 }
                 if (rc == 0)
                         RETURN(LDLM_ITER_STOP);
@@ -335,19 +325,21 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
         }
 
  restart:
-        rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list);
+        rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, &rpc_list,
+                                      NULL);
         if (rc < 0)
-                GOTO(out, rc); /* lock was destroyed */
-        if (rc == 2) {
+                GOTO(destroylock, rc);
+        if (rc == 2)
                 goto grant;
-        }
 
-        rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list);
+        /* Traverse the waiting list in case there are other conflicting
+         * lock requests ahead of us in the queue and send blocking ASTs */
+        rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, &rpc_list,
+                                       &insertp);
         if (rc2 < 0)
-                GOTO(out, rc = rc2); /* lock was destroyed */
-
+                GOTO(destroylock, rc);
         if (rc + rc2 == 2) {
       grant:
+ grant:
                 ldlm_extent_policy(res, lock, flags);
                 ldlm_resource_unlink_lock(lock);
                 ldlm_grant_lock(lock, NULL);
@@ -359,7 +351,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                  * terrible folly -- if we goto restart, we could get
                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
                 if (list_empty(&lock->l_res_link))
-                        ldlm_resource_add_lock(res, &res->lr_waiting, lock);
+                        ldlm_resource_add_lock(res, insertp, lock);
                 unlock_res(res);
                 rc = ldlm_run_bl_ast_work(&rpc_list);
                 lock_res(res);
@@ -367,8 +359,15 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                         GOTO(restart, -ERESTART);
                 *flags |= LDLM_FL_BLOCK_GRANTED;
         }
-        rc = 0;
-out:
+
+        RETURN(0);
+
+ destroylock:
+        list_del_init(&lock->l_res_link);
+        unlock_res(res);
+        ldlm_lock_destroy(lock);
+        lock_res(res);
+        *err = rc;
         RETURN(rc);
 }
 
index c6ee99e..8d992b2 100644 (file)
@@ -30,8 +30,6 @@ typedef enum {
 int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync);
 
 /* ldlm_resource.c */
-void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
-                                     struct ldlm_lock *new);
 int ldlm_resource_putref_locked(struct ldlm_resource *res);
 
 /* ldlm_lock.c */
index 606ccc8..13d3be3 100644 (file)
@@ -498,11 +498,11 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
                 lock->l_writers--;
         }
 
-        if (lock->l_flags & LDLM_FL_LOCAL &&
+        if (((lock->l_flags & LDLM_FL_LOCAL) || (mode == LCK_GROUP)) &&
             !lock->l_readers && !lock->l_writers) {
-                /* If this is a local lock on a server namespace and this was
-                 * the last reference, cancel the lock. */
-                CDEBUG(D_INFO, "forcing cancel of local lock\n");
+                /* If this is a local lock on a server namespace or a group
+                 * lock and this was the last reference, cancel the lock. */
+                CDEBUG(D_INFO, "forcing cancel of local or group lock\n");
                 lock->l_flags |= LDLM_FL_CBPENDING;
         }
 
@@ -625,16 +625,19 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                 if (!(lock->l_req_mode & mode))
                         continue;
 
-                if (lock->l_resource->lr_type == LDLM_EXTENT &&
-                    (lock->l_policy_data.l_extent.start >
-                     policy->l_extent.start ||
-                     lock->l_policy_data.l_extent.end < policy->l_extent.end))
-                        continue;
-
-                if (lock->l_resource->lr_type == LDLM_EXTENT &&
-                    mode == LCK_GROUP &&
-                    lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
-                        continue;
+                if (lock->l_resource->lr_type == LDLM_EXTENT) {
+                        if (mode == LCK_GROUP) {
+                                if (lock->l_policy_data.l_extent.gid !=
+                                    policy->l_extent.gid)
+                                        continue;
+                        } else {
+                                if ((lock->l_policy_data.l_extent.start >
+                                     policy->l_extent.start) ||
+                                    (lock->l_policy_data.l_extent.end <
+                                     policy->l_extent.end))
+                                        continue;
+                        }
+                }
 
                 /* We match if we have existing lock with same or wider set
                    of bits. */
@@ -1285,11 +1288,13 @@ void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos)
                ldlm_lockname[lock->l_granted_mode],
                atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers);
         if (lock->l_resource->lr_type == LDLM_EXTENT)
-                CDEBUG(level, "  Extent: "LPU64" -> "LPU64
-                       " (req "LPU64"-"LPU64")\n",
+                CDEBUG(level, "  Extent: "LPU64" -> "LPU64" gid: "LPU64
+                       " (req "LPU64"-"LPU64" gid: "LPU64")\n",
                        lock->l_policy_data.l_extent.start,
                        lock->l_policy_data.l_extent.end,
-                       lock->l_req_extent.start, lock->l_req_extent.end);
+                       lock->l_policy_data.l_extent.gid,
+                       lock->l_req_extent.start, lock->l_req_extent.end,
+                       lock->l_req_extent.gid);
         else if (lock->l_resource->lr_type == LDLM_FLOCK)
                 CDEBUG(level, "  Pid: "LPU64" Extent: "LPU64" -> "LPU64"\n",
                        lock->l_policy_data.l_flock.pid,
index e367cc1..4e32bd4 100644 (file)
@@ -478,7 +478,8 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         do_gettimeofday(&granted_time);
         total_enqueue_wait = timeval_sub(&granted_time, &lock->l_enqueued_time);
 
-        if (total_enqueue_wait / 1000000 > obd_timeout)
+        if (((flags & LDLM_FL_NO_TIMEOUT) == 0) &&
+            ((total_enqueue_wait / 1000000) > obd_timeout))
                 LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
 
         lock_res_and_lock(lock);
index 3633d87..e3c4011 100644 (file)
@@ -708,26 +708,6 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
         list_add_tail(&lock->l_res_link, head);
 }
 
-void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
-                                     struct ldlm_lock *new)
-{
-        struct ldlm_resource *res = original->l_resource;
-
-        check_res_locked(res);
-
-        ldlm_resource_dump(D_OTHER, res);
-        CDEBUG(D_OTHER, "About to insert this lock after %p:\n", original);
-        ldlm_lock_dump(D_OTHER, new, 0);
-
-        if (new->l_destroyed) {
-                CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
-                return;
-        }
-
-        LASSERT(list_empty(&new->l_res_link));
-        list_add(&new->l_res_link, &original->l_res_link);
-}
-
 void ldlm_resource_unlink_lock(struct ldlm_lock *lock)
 {
         check_res_locked(lock->l_resource);
index b3d4d68..361a5fe 100644 (file)
@@ -238,10 +238,9 @@ static int lmv_connect(struct lustre_handle *conn, struct obd_device *obd,
         }
 #endif
 
-        /* 
-         * all real clients should perform actual connection rightaway, because
+        /* all real clients should perform actual connection right away, because
          * it is possible, that LMV will not have opportunity to connect
-         * targets, as MDC stuff will bit called directly, for instance while
+         * targets, as MDC stuff will be called directly, for instance while
          * reading ../mdc/../kbytesfree procfs file, etc.
          */
         if (flags & OBD_OPT_REAL_CLIENT)
index cbb114d..8774a94 100644 (file)
@@ -1686,8 +1686,7 @@ static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
                 LASSERT(lov_lockhp);
 
                 *flags = save_flags;
-                sub_policy.l_extent.start = req->rq_extent.start;
-                sub_policy.l_extent.end = req->rq_extent.end;
+                sub_policy.l_extent = req->rq_extent;
 
                 rc = obd_enqueue(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
                                  type, &sub_policy, mode, flags, bl_cb,
@@ -1731,8 +1730,7 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
                 LASSERT(lov_lockhp);
 
-                sub_policy.l_extent.start = req->rq_extent.start;
-                sub_policy.l_extent.end = req->rq_extent.end;
+                sub_policy.l_extent = req->rq_extent;
                 lov_flags = *flags;
 
                 rc = obd_match(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
index f99307c..d5eac9b 100644 (file)
@@ -315,6 +315,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
 
                 req->rq_extent.start = start;
                 req->rq_extent.end = end;
+                req->rq_extent.gid = policy->l_extent.gid;
 
                 req->rq_idx = loi->loi_ost_idx;
                 req->rq_gen = loi->loi_ost_gen;
@@ -424,6 +425,7 @@ int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
 
                 req->rq_extent.start = start;
                 req->rq_extent.end = end;
+                req->rq_extent.gid = policy->l_extent.gid;
 
                 req->rq_idx = loi->loi_ost_idx;
                 req->rq_gen = loi->loi_ost_gen;
@@ -1324,6 +1326,7 @@ int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
 
                 req->rq_extent.start = rs;
                 req->rq_extent.end = re;
+                req->rq_extent.gid = -1;
 
                 lov_set_add_req(req, set);
         }
@@ -1409,6 +1412,8 @@ int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
                 req->rq_oa->o_id = loi->loi_id;
                 req->rq_extent.start = rs;
                 req->rq_extent.end = re;
+                req->rq_extent.gid = -1;
+
                 lov_set_add_req(req, set);
         }
         if (!set->set_count)
index 70ce8a4..b41bbf5 100644 (file)
@@ -2764,11 +2764,7 @@ static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
 {
         ENTRY;
 
-        if (mode == LCK_GROUP)
-                ldlm_lock_decref_and_cancel(lockh, mode);
-        else
-                ldlm_lock_decref(lockh, mode);
-
+        ldlm_lock_decref(lockh, mode);
         RETURN(0);
 }