Whamcloud - gitweb
most of the work for lock replay:
authorpschwan <pschwan>
Sun, 27 Oct 2002 03:03:50 +0000 (03:03 +0000)
committerpschwan <pschwan>
Sun, 27 Oct 2002 03:03:50 +0000 (03:03 +0000)
  - add a dlm replay flag that:
    - prevents the calling of policy functions
    - tells the server to trust the client's judgement
  - a function to call ldlm_cli_enqueue for replay

next up:
  - something to help replay _all_ locks in a given namespace, probably in
    the form of some generic ldlm_namespace_foreach thing

lustre/include/linux/lustre_dlm.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_request.c
lustre/llite/file.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_reint.c

index c07d6ff..791f425 100644 (file)
@@ -39,6 +39,7 @@ typedef enum {
 #define LDLM_FL_DESTROYED      (1 << 6)
 #define LDLM_FL_WAIT_NOREPROC  (1 << 7)
 #define LDLM_FL_CANCEL         (1 << 8)
+#define LDLM_FL_REPLAY         (1 << 9)
 
 #define LDLM_CB_BLOCKING    1
 #define LDLM_CB_CANCELING   2
@@ -144,8 +145,6 @@ struct ldlm_lock {
         struct lustre_handle   l_remote_handle;
         void                 *l_data;
         __u32                 l_data_len;
-        void                 *l_cookie;
-        int                   l_cookie_len;
         struct ldlm_extent    l_extent;
         __u32                 l_version[RES_VERSION_SIZE];
 
@@ -308,7 +307,7 @@ void ldlm_lock_put(struct ldlm_lock *lock);
 void ldlm_lock_destroy(struct ldlm_lock *lock);
 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc);
 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode);
-void ldlm_lock_addref_internal(struct ldlm_lock, __u32 mode);
+void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode);
 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_grant_lock(struct ldlm_lock *lock);
 int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
index a361cab..7e0e93c 100644 (file)
@@ -564,7 +564,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                         continue;
 
                 /* lock_convert() takes the resource lock, so we're sure that
-                 * req_mode, lr_type, and l_cookie won't change beneath us */
+                 * req_mode and lr_type won't change beneath us */
                 if (lock->l_req_mode != mode)
                         continue;
 
@@ -673,9 +673,10 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
         if (res->lr_type == LDLM_EXTENT)
                 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
 
-        /* policies are not executed on the client */
+        /* policies are not executed on the client or during replay */
         local = res->lr_namespace->ns_client;
-        if (!local && (policy = ldlm_res_policy_table[res->lr_type])) {
+        if (!local && !(*flags & LDLM_FL_REPLAY) &&
+            (policy = ldlm_res_policy_table[res->lr_type])) {
                 int rc;
                 rc = policy(lock, cookie, lock->l_req_mode, NULL);
 
@@ -688,9 +689,6 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
                 }
         }
 
-        lock->l_cookie = cookie;
-        lock->l_cookie_len = cookie_len;
-
         l_lock(&res->lr_namespace->ns_lock);
         if (local && lock->l_req_mode == lock->l_granted_mode) {
                 /* The server returned a blocked lock, but it was granted before
@@ -705,9 +703,15 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
          * namespace only has information about locks taken by that client, and
          * thus doesn't have enough information to decide for itself if it can
          * be granted (below).  In this case, we do exactly what the server
-         * tells us to do, as dictated by the 'flags' */
+         * tells us to do, as dictated by the 'flags'.
+         *
+         * We do exactly the same thing during recovery, when the server is
+         * more or less trusting the clients not to lie.
+         *
+         * FIXME (bug 629283): Detect obvious lies by checking compatibility in
+         * granted/converting queues. */
         ldlm_resource_unlink_lock(lock);
-        if (local) {
+        if (local || (*flags & LDLM_FL_REPLAY)) {
                 if (*flags & LDLM_FL_BLOCK_CONV)
                         ldlm_resource_add_lock(res, res->lr_converting.prev,
                                                lock);
index 4d16f31..4117b67 100644 (file)
@@ -169,24 +169,37 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         struct ldlm_lock *lock;
         struct ldlm_request *body;
         struct ldlm_reply *reply;
-        int rc, size = sizeof(*body), req_passed_in = 1;
+        int rc, size = sizeof(*body), req_passed_in = 1, is_replay;
         ENTRY;
 
+        is_replay = *flags & LDLM_FL_REPLAY;
+        LASSERT(connh != NULL || !is_replay);
+
         if (connh == NULL)
                 return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
                                               type, cookie, cookielen, mode,
                                               flags, completion, blocking, data,
                                               data_len, lockh);
 
-        *flags = 0;
-        lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
-                                data, data_len);
-        if (lock == NULL)
-                GOTO(out_nolock, rc = -ENOMEM);
-        LDLM_DEBUG(lock, "client-side enqueue START");
-        /* for the local lock, add the reference */
-        ldlm_lock_addref_internal(lock, mode);
-        ldlm_lock2handle(lock, lockh);
+        /* If we're replaying this lock, just check some invariants.
+         * If we're creating a new lock, get everything all setup nice. */
+        if (is_replay) {
+                lock = ldlm_handle2lock(lockh);
+                LDLM_DEBUG(lock, "client-side enqueue START");
+                LASSERT(connh == lock->l_connh);
+        } else {
+                lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type,
+                                        mode, data, data_len);
+                if (lock == NULL)
+                        GOTO(out_nolock, rc = -ENOMEM);
+                LDLM_DEBUG(lock, "client-side enqueue START");
+                /* for the local lock, add the reference */
+                ldlm_lock_addref_internal(lock, mode);
+                ldlm_lock2handle(lock, lockh);
+                if (type == LDLM_EXTENT)
+                        memcpy(&lock->l_extent, cookie,
+                               sizeof(body->lock_desc.l_extent));
+        }
 
         if (req == NULL) {
                 req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1,
@@ -197,17 +210,9 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
                 LBUG();
 
-        /* Dump all of this data into the request buffer */
+        /* Dump lock data into the request buffer */
         body = lustre_msg_buf(req->rq_reqmsg, 0);
         ldlm_lock2desc(lock, &body->lock_desc);
-        /* Phil: make this part of ldlm_lock2desc */
-        if (type == LDLM_EXTENT) {
-                memcpy(&body->lock_desc.l_extent, cookie,
-                       sizeof(body->lock_desc.l_extent));
-                CDEBUG(D_INFO, "extent in body: "LPU64" -> "LPU64"\n",
-                           body->lock_desc.l_extent.start,
-                           body->lock_desc.l_extent.end);
-        }
         body->lock_flags = *flags;
 
         memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
@@ -223,11 +228,12 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         lock->l_connh = connh;
         lock->l_export = NULL;
 
+        LDLM_DEBUG(lock, "sending request");
         rc = ptlrpc_queue_wait(req);
-        /* FIXME: status check here? */
         rc = ptlrpc_check_status(req, rc);
 
         if (rc != ELDLM_OK) {
+                LASSERT(!is_replay);
                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
                 ldlm_lock_decref(lockh, mode);
@@ -240,22 +246,26 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         reply = lustre_msg_buf(req->rq_repmsg, 0);
         memcpy(&lock->l_remote_handle, &reply->lock_handle,
                sizeof(lock->l_remote_handle));
-        if (type == LDLM_EXTENT)
-                memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
         *flags = reply->lock_flags;
 
         CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
                (void *)(unsigned long)reply->lock_handle.addr, *flags);
-        CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got extent "
-               LPU64" -> "LPU64"\n",
-               body->lock_desc.l_extent.start, body->lock_desc.l_extent.end,
-               reply->lock_extent.start, reply->lock_extent.end);
+        if (type == LDLM_EXTENT) {
+                CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got "
+                       "extent "LPU64" -> "LPU64"\n",
+                       body->lock_desc.l_extent.start,
+                       body->lock_desc.l_extent.end,
+                       reply->lock_extent.start, reply->lock_extent.end);
+                cookie = &reply->lock_extent; /* FIXME bug 629281 */
+                cookielen = sizeof(reply->lock_extent);
+        }
 
         /* If enqueue returned a blocked lock but the completion handler has
          * already run, then it fixed up the resource and we don't need to do it
          * again. */
         if ((*flags) & LDLM_FL_LOCK_CHANGED) {
                 int newmode = reply->lock_mode;
+                LASSERT(!is_replay);
                 if (newmode && newmode != lock->l_req_mode) {
                         LDLM_DEBUG(lock, "server returned different mode %s",
                                    ldlm_lockname[newmode]);
@@ -282,10 +292,12 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         if (!req_passed_in)
                 ptlrpc_req_finished(req);
 
-        rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
-                               blocking);
-        if (lock->l_completion_ast)
-                lock->l_completion_ast(lock, *flags);
+        if (!is_replay) {
+                rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags,
+                                       completion, blocking);
+                if (lock->l_completion_ast)
+                        lock->l_completion_ast(lock, *flags);
+        }
 
         LDLM_DEBUG(lock, "client-side enqueue END");
         EXIT;
@@ -325,10 +337,20 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh,
                 RETURN(0);
 }
 
+int ldlm_cli_replay_enqueue(struct ldlm_lock *lock)
+{
+        struct lustre_handle lockh;
+        int flags = LDLM_FL_REPLAY;
+        ldlm_lock2handle(lock, &lockh);
+        return ldlm_cli_enqueue(lock->l_connh, NULL, NULL, NULL, NULL,
+                                lock->l_resource->lr_type, NULL, 0, -1, &flags,
+                                NULL, NULL, NULL, 0, &lockh);
+}
+
 static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
                                   int *flags)
 {
-
+        ENTRY;
         if (lock->l_resource->lr_namespace->ns_client) {
                 CERROR("Trying to cancel local lock\n");
                 LBUG();
@@ -365,7 +387,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
         connh = lock->l_connh;
 
         if (!connh)
-                return ldlm_cli_convert_local(lock, new_mode, flags);
+                RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
 
         LDLM_DEBUG(lock, "client-side convert");
 
@@ -470,12 +492,8 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
         return rc;
 }
 
-/* Cancel all locks on a given resource that have 0 readers/writers.
- *
- * If 'local_only' is true, throw the locks away without trying to notify the
- * server. */
-int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id,
-                           int local_only)
+static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
+                                           __u64 *res_id, int local_only)
 {
         struct ldlm_resource *res;
         struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
@@ -534,3 +552,39 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id,
 
         RETURN(0);
 }
+
+/* Cancel all locks on a namespace (or a specific resource, if given) that have
+ * 0 readers/writers.
+ *
+ * If 'local_only' is true, throw the locks away without trying to notify the
+ * server. */
+int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id,
+                           int local_only)
+{
+        int i;
+
+        if (res_id)
+                RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, local_only));
+
+        l_lock(&ns->ns_lock);
+        for (i = 0; i < RES_HASH_SIZE; i++) {
+                struct list_head *tmp, *pos;
+                list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
+                        int rc;
+                        struct ldlm_resource *res;
+                        res = list_entry(tmp, struct ldlm_resource, lr_hash);
+                        ldlm_resource_getref(res);
+
+                        rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
+                                                             local_only);
+
+                        if (rc)
+                                CERROR("cancel_unused_res ("LPU64"): %d\n",
+                                       res->lr_name[0], rc);
+                        ldlm_resource_put(res);
+                }
+        }
+        l_unlock(&ns->ns_lock);
+
+        return ELDLM_OK;
+}
index 9429b79..7778e01 100644 (file)
@@ -415,7 +415,6 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
         struct ll_file_data *fd = (struct ll_file_data *)filp->private_data;
         struct inode *inode = filp->f_dentry->d_inode;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct ldlm_extent extent;
         struct lustre_handle *lockhs = NULL;
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         int flags = 0;
@@ -425,6 +424,7 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
 
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
             !(sbi->ll_flags & LL_SBI_NOLCK)) {
+                struct ldlm_extent extent;
                 OBD_ALLOC(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs));
                 if (!lockhs)
                         RETURN(-ENOMEM);
@@ -475,7 +475,6 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
         struct ll_file_data *fd = (struct ll_file_data *)file->private_data;
         struct inode *inode = file->f_dentry->d_inode;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct ldlm_extent extent;
         struct lustre_handle *lockhs = NULL, *eof_lockhs = NULL;
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         int flags = 0;
@@ -514,6 +513,7 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
 
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
             !(sbi->ll_flags & LL_SBI_NOLCK)) {
+                struct ldlm_extent extent;
                 OBD_ALLOC(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs));
                 if (!lockhs)
                         GOTO(out_eof, retval = -ENOMEM);
index 1f9c07b..1778727 100644 (file)
@@ -205,7 +205,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
         struct obd_device *obddev = class_conn2obd(conn);
         __u64 res_id[RES_NAME_SIZE] = {dir->i_ino};
         int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
-        int rc, flags;
+        int rc, flags = 0;
         int repsize[3] = {sizeof(struct ldlm_reply),
                           sizeof(struct mds_body),
                           obddev->u.cli.cl_max_mds_easize};
index 0419a75..4629542 100644 (file)
@@ -557,7 +557,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
         struct inode *dir;
         struct lustre_handle lockh;
         char *name;
-        int namelen, flags, lock_mode, rc = 0;
+        int namelen, flags = 0, lock_mode, rc = 0;
         struct obd_ucred uc;
         __u64 res_id[3] = {0, 0, 0};
         ENTRY;
index 60ea97a..45cd424 100644 (file)
@@ -127,7 +127,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         inode = de->d_inode;
         CDEBUG(D_INODE, "ino %ld\n", inode->i_ino);
 
-        OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, 
+        OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
                        to_kdev_t(inode->i_sb->s_dev));
 
         handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR);
@@ -413,7 +413,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                                "child missing (%ld/%s); OK for REPLAYING\n",
                                dir->i_ino, rec->ur_name);
                         rc = 0;
-                } else { 
+                } else {
                         CDEBUG(D_INODE,
                                "child doesn't exist (dir %ld, name %s)\n",
                                dir->i_ino, rec->ur_name);
@@ -432,7 +432,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 mds_pack_inode2body(body, inode);
         }
 
-        OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, 
+        OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
                        to_kdev_t(dir->i_sb->s_dev));
 
         switch (rec->ur_mode /* & S_IFMT ? */) {
@@ -655,7 +655,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         struct dentry *de_new = NULL;
         struct mds_obd *mds = mds_req2mds(req);
         struct lustre_handle tgtlockh, srclockh, oldhandle;
-        int flags, lock_mode, rc = 0, err;
+        int flags = 0, lock_mode, rc = 0, err;
         void *handle;
         __u64 res_id[3] = { 0 };
         ENTRY;
@@ -692,6 +692,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
                              NULL, 0, lock_mode, &tgtlockh);
         if (rc == 0) {
+                flags = 0;
                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
@@ -773,6 +774,7 @@ out_rename_denew:
 out_rename_deold:
         if (!rc) {
                 res_id[0] = de_old->d_inode->i_ino;
+                flags = 0;
                 /* Take an exclusive lock on the resource that we're
                  * about to free, to force everyone to drop their
                  * locks. */