Whamcloud - gitweb
Add flock support.
authoradilger <adilger>
Sat, 7 Aug 2004 00:43:56 +0000 (00:43 +0000)
committeradilger <adilger>
Sat, 7 Aug 2004 00:43:56 +0000 (00:43 +0000)
b=3897

15 files changed:
lustre/include/linux/lustre_log.h
lustre/kernel_patches/patches/ext3-mds-num-2.4.24.patch
lustre/ldlm/ldlm_flock.c
lustre/ldlm/ldlm_request.c
lustre/llite/file.c
lustre/lmv/lmv_intent.c
lustre/lov/lov_log.c
lustre/lvfs/llog_cat.c
lustre/lvfs/llog_lvfs.c
lustre/mds/handler.c
lustre/mds/mds_fs.c
lustre/mds/mds_lmv.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/osc/osc_create.c

index aebde2c..b6e00cb 100644 (file)
@@ -501,7 +501,7 @@ static inline void llog_create_lock_free(struct llog_create_locks *lcl)
         int size, offset = offsetof(struct llog_create_locks, lcl_locks);
         int i;
         ENTRY;
-                                                                                                                             
+
         for (i = 0; i < lcl->lcl_count; i ++) {
                 if (lcl->lcl_locks[i] != NULL) {
 #ifdef __KERNEL__
index f6561ca..7a97150 100644 (file)
@@ -9,7 +9,7 @@ Index: linux-2.4.24/fs/ext3/namei.c
 +              unsigned type = de->file_type;
 +              __u32 *mds;
 +              mds = (__u32 *)((char *) de + EXT3_DIR_REC_LEN(de->name_len));
-+                if ((type & 128) && EXT3_HAS_INCOMPAT_FEATURE(dir->i_sb,
++              if ((type & 128) && EXT3_HAS_INCOMPAT_FEATURE(dir->i_sb,
 +                              EXT3_FEATURE_INCOMPAT_MDSNUM) &&
 +                              mds[0] != EXT3_SB(dir->i_sb)->s_mdsnum) {
 +                      struct ext3_super_block *es;
@@ -56,7 +56,7 @@ Index: linux-2.4.24/fs/ext3/namei.c
        char            *top;
        
        reclen = EXT3_DIR_REC_LEN(namelen);
-+        if (EXT3_HAS_INCOMPAT_FEATURE(sb, EXT3_FEATURE_INCOMPAT_MDSNUM)
++      if (EXT3_HAS_INCOMPAT_FEATURE(sb, EXT3_FEATURE_INCOMPAT_MDSNUM)
 +                      && (dentry->d_flags & DCACHE_CROSS_REF)
 +                      && (dentry->d_mdsnum != EXT3_SB(sb)->s_mdsnum))
 +              reclen += 8; /* we need space to store mds num */
index ea17da6..800880c 100644 (file)
@@ -39,6 +39,9 @@
 
 static struct list_head ldlm_flock_waitq = LIST_HEAD_INIT(ldlm_flock_waitq);
 
+int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                            void *data, int flag);
+
 /**
  * list_for_remaining_safe - iterate over the remaining entries in a list
  *              and safeguard against removal of a list entry.
@@ -75,6 +78,8 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
                    mode, flags);
 
+        LASSERT(list_empty(&lock->l_flock_waitq));
+
         list_del_init(&lock->l_res_link);
         if (flags == LDLM_FL_WAIT_NOREPROC) {
                 /* client side - set a flag to prevent sending a CANCEL */
@@ -125,6 +130,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
         struct ldlm_lock *new = req;
         struct ldlm_lock *new2 = NULL;
         ldlm_mode_t mode = req->l_req_mode;
+        int local = ns->ns_client;
         int added = (mode == LCK_NL);
         int overlaps = 0;
         ENTRY;
@@ -136,8 +142,14 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
 
         *err = ELDLM_OK;
 
-        /* No blocking ASTs are sent for Posix file & record locks */
-        req->l_blocking_ast = NULL;
+        if (local) {
+                /* No blocking ASTs are sent to the clients for
+                 * Posix file & record locks */
+                req->l_blocking_ast = NULL;
+        } else {
+                /* Called on the server for lock cancels. */
+                req->l_blocking_ast = ldlm_flock_blocking_ast;
+        }
 
         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
                 /* This loop determines where this processes locks start
@@ -219,6 +231,10 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                 RETURN(LDLM_ITER_STOP);
         }
 
+        /* In case we had slept on this lock request take it off of the
+         * deadlock detection waitq. */
+        list_del_init(&req->l_flock_waitq);
+
         /* Scan the locks owned by this process that overlap this request.
          * We may have to merge or split existing locks. */
 
@@ -412,10 +428,13 @@ ldlm_flock_interrupted_wait(void *data)
         ENTRY;
 
         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
+
+        /* take lock off the deadlock detection waitq. */
+        list_del_init(&lock->l_flock_waitq);
+
         ldlm_lock_decref_internal(lock, lock->l_req_mode);
         ldlm_lock2handle(lock, &lockh);
         rc = ldlm_cli_cancel(&lockh);
-        CDEBUG(D_DLMTRACE, "ldlm_cli_cancel: %d\n", rc);
         EXIT;
 }
 
@@ -450,7 +469,7 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
                    "sleeping");
 
-        ldlm_lock_dump(D_OTHER, lock, 0);
+        ldlm_lock_dump(D_DLMTRACE, lock, 0);
 
         fwd.fwd_lock = lock;
         obd = class_exp2obd(lock->l_conn_export);
@@ -486,7 +505,7 @@ granted:
         ns = lock->l_resource->lr_namespace;
         l_lock(&ns->ns_lock);
 
-        /* take data off of deadlock detection waitq. */
+        /* take lock off the deadlock detection waitq. */
         list_del_init(&lock->l_flock_waitq);
 
         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
@@ -519,3 +538,21 @@ granted:
         l_unlock(&ns->ns_lock);
         RETURN(0);
 }
+
+int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                            void *data, int flag)
+{
+        struct ldlm_namespace *ns;
+        ENTRY;
+
+        LASSERT(lock);
+        LASSERT(flag == LDLM_CB_CANCELING);
+
+        ns = lock->l_resource->lr_namespace;
+        
+        /* take lock off the deadlock detection waitq. */
+        l_lock(&ns->ns_lock);
+        list_del_init(&lock->l_flock_waitq);
+        l_unlock(&ns->ns_lock);
+        RETURN(0);
+}
index aa8881a..ec5f8d4 100644 (file)
@@ -237,6 +237,7 @@ int ldlm_cli_enqueue(struct obd_export *exp,
         struct ldlm_reply *reply;
         int rc, size[2] = {sizeof(*body), lvb_len}, req_passed_in = 1;
         int is_replay = *flags & LDLM_FL_REPLAY;
+        int cleanup_phase = 0;
         ENTRY;
 
         if (exp == NULL) {
@@ -258,7 +259,7 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                 lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking,
                                         completion, glimpse, data, lvb_len);
                 if (lock == NULL)
-                        GOTO(out_nolock, rc = -ENOMEM);
+                        RETURN(-ENOMEM);
                 /* for the local lock, add the reference */
                 ldlm_lock_addref_internal(lock, mode);
                 ldlm_lock2handle(lock, lockh);
@@ -271,11 +272,14 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                 LDLM_DEBUG(lock, "client-side enqueue START");
         }
 
+        /* lock not sent to server yet */
+        cleanup_phase = 2;
+
         if (req == NULL) {
                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
                                       size, NULL);
                 if (req == NULL)
-                        GOTO(out_lock, rc = -ENOMEM);
+                        GOTO(cleanup, rc = -ENOMEM);
                 req_passed_in = 0;
         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
                 LBUG();
@@ -319,21 +323,29 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                                 tmplvb = lustre_swab_repbuf(req, 1, lvb_len,
                                                             lvb_swabber);
                                 if (tmplvb == NULL)
-                                        GOTO(out_lock, rc = -EPROTO);
+                                        GOTO(cleanup, rc = -EPROTO);
                                 if (lvb != NULL)
                                         memcpy(lvb, tmplvb, lvb_len);
                         }
                 }
-                GOTO(out_lock, rc);
+                GOTO(cleanup, rc);
         }
 
         reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
                                    lustre_swab_ldlm_reply);
         if (reply == NULL) {
                 CERROR("Can't unpack ldlm_reply\n");
-                GOTO(out_lock, rc = -EPROTO);
+                GOTO(cleanup, rc = -EPROTO);
         }
 
+        /* XXX - Phil, wasn't sure if this shoiuld go before or after the
+        /* lustre_swab_repbuf() ? If we can't unpack the reply then we
+        /* don't know what occurred on the server so I think the safest
+        /* bet is to cleanup the lock as if it didn't make it ? */
+
+        /* lock enqueued on the server */
+        cleanup_phase = 1;
+
         memcpy(&lock->l_remote_handle, &reply->lock_handle,
                sizeof(lock->l_remote_handle));
         *flags = reply->lock_flags;
@@ -366,7 +378,7 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                                            reply->lock_desc.l_resource.lr_name);
                         if (lock->l_resource == NULL) {
                                 LBUG();
-                                GOTO(out_lock, rc = -ENOMEM);
+                                GOTO(cleanup, rc = -ENOMEM);
                         }
                         LDLM_DEBUG(lock, "client-side enqueue, new resource");
                 }
@@ -391,7 +403,7 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                 void *tmplvb;
                 tmplvb = lustre_swab_repbuf(req, 1, lvb_len, lvb_swabber);
                 if (tmplvb == NULL)
-                        GOTO(out_lock, rc = -EPROTO);
+                        GOTO(cleanup, rc = -EPROTO);
                 memcpy(lock->l_lvb_data, tmplvb, lvb_len);
         }
 
@@ -412,13 +424,17 @@ int ldlm_cli_enqueue(struct obd_export *exp,
 
         LDLM_DEBUG(lock, "client-side enqueue END");
         EXIT;
- out_lock:
-        if (rc)
-                failed_lock_cleanup(ns, lock, lockh, mode);
-        if (!req_passed_in && req != NULL)
-                ptlrpc_req_finished(req);
+cleanup:
+        switch (cleanup_phase) {
+        case 2:
+                if (rc)
+                        failed_lock_cleanup(ns, lock, lockh, mode);
+        case 1:
+                if (!req_passed_in && req != NULL)
+                        ptlrpc_req_finished(req);
+        }
+
         LDLM_LOCK_PUT(lock);
- out_nolock:
         return rc;
 }
 
index edc28c4..0ac8d65 100644 (file)
@@ -1297,12 +1297,21 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
 
         switch (cmd) {
         case F_SETLKW:
+#ifdef F_SETLKW64
+        case F_SETLKW64:
+#endif
                 flags = 0;
                 break;
         case F_SETLK:
+#ifdef F_SETLK64
+        case F_SETLK64:
+#endif
                 flags = LDLM_FL_BLOCK_NOWAIT;
                 break;
         case F_GETLK:
+#ifdef F_GETLK64
+        case F_GETLK64:
+#endif
                 flags = LDLM_FL_TEST_LOCK;
                 /* Save the old mode so that if the mode in the lock changes we
                  * can decrement the appropriate reader or writer refcount. */
@@ -1318,7 +1327,8 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
                flags, mode, flock.l_flock.start, flock.l_flock.end);
 
         obddev = md_get_real_obd(sbi->ll_mdc_exp, NULL, 0);
-        rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, obddev->obd_namespace,
+        rc = ldlm_cli_enqueue(obddev->obd_self_export, NULL,
+                              obddev->obd_namespace,
                               res_id, LDLM_FLOCK, &flock, mode, &flags,
                               NULL, ldlm_flock_completion_ast, NULL, file_lock,
                               NULL, 0, NULL, &lockh);
@@ -1430,7 +1440,7 @@ struct file_operations ll_file_operations = {
         .sendfile       = generic_file_sendfile,
 #endif
         .fsync          = ll_fsync,
-        //.lock           ll_file_flock
+        .lock           = ll_file_flock
 };
 
 struct inode_operations ll_file_inode_operations = {
index 925e55d..b14df18 100644 (file)
@@ -497,11 +497,11 @@ int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt,
                         lmv_put_obj(obj);
                 }
                 mds = rpfid.mds;
-                
+
                 CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n",
                        (unsigned long)cfid->mds, (unsigned long)cfid->id,
                        (unsigned long)cfid->generation, mds);
-                
+
                 rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, pfid, name,
                                     len, lmm, lmmsize, cfid, it, flags,
                                     reqp, cb_blocking);
index 9a483bb..f1eecb0 100644 (file)
@@ -85,7 +85,7 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
         down(&lov->lov_llog_sem);
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 struct obd_device *child =
-                        lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd; 
+                        lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd;
                 struct llog_ctxt *cctxt;
                 cctxt = llog_get_context(&child->obd_llogs, ctxt->loc_idx);
 
index e797332..b6fbd8e 100644 (file)
@@ -625,8 +625,7 @@ int llog_catalog_cleanup(struct llog_ctxt *ctxt)
         cathandle = ctxt->loc_handle;
         if (cathandle)
                 llog_cat_put(ctxt->loc_handle);
-//        OBD_FREE(ctxt, sizeof(*ctxt));
+
         return 0;
 }
 EXPORT_SYMBOL(llog_catalog_cleanup);
index 718b9a2..938cfc3 100644 (file)
@@ -699,7 +699,7 @@ llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
                 OBD_ALLOC(oa, sizeof(*oa));
                 if (!oa)
                         RETURN(ERR_PTR(-ENOMEM));
-                
+
                 oa->o_gr = FILTER_GROUP_LLOG;
                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
                 rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
index d9353a6..51e32ef 100644 (file)
@@ -827,7 +827,7 @@ int mds_check_mds_num(struct obd_device *obd, struct inode* inode,
         struct mea *mea = NULL;
         int mea_size, rc = 0;
         ENTRY;
-                                                                                                                                                                                                     
+
         rc = mds_get_lmv_attr(obd, inode, &mea, &mea_size);
         if (rc)
                 RETURN(rc);
@@ -843,7 +843,7 @@ int mds_check_mds_num(struct obd_device *obd, struct inode* inode,
                         rc = -ERESTART;
                 }
         }
-                                                                                                                                                                                                     
+
         if (mea)
                 OBD_FREE(mea, mea_size);
         RETURN(rc);
index 79a4cb5..6c8b722 100644 (file)
@@ -633,7 +633,7 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa,
         down(&parent_inode->i_sem);
         if (oa->o_id) {
                 namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
+
                 dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
                 if (IS_ERR(dchild))
                         GOTO(out_pop, rc = PTR_ERR(dchild));
index da60695..8d5a431 100644 (file)
@@ -188,7 +188,7 @@ int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
                *mea = NULL;
        } else
                 rc = 0;
-                        
+
        RETURN(rc);
 }
 
index 244b593..0da11b1 100644 (file)
@@ -1251,9 +1251,9 @@ int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
                 if (req != NULL && req->rq_repmsg != NULL &&
                     (reply_body->valid & OBD_MD_FLEASIZE) &&
                     mds_log_op_unlink(obd, pending_child->d_inode,
-                                                lmm, req->rq_repmsg->buflens[1],
-                                                lustre_msg_buf(req->rq_repmsg, 2, 0),
-                                                req->rq_repmsg->buflens[2], &lcl) > 0) {
+                                      lmm, req->rq_repmsg->buflens[1],
+                                      lustre_msg_buf(req->rq_repmsg, 2, 0),
+                                      req->rq_repmsg->buflens[2], &lcl) > 0) {
                         reply_body->valid |= OBD_MD_FLCOOKIE;
                 }
 
index 2106d7a..d7696b2 100644 (file)
@@ -1691,7 +1691,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                                                  rec->ur_namelen, &child_lockh,
                                                  &dchild, LCK_EX,
                                                  MDS_INODELOCK_LOOKUP |
-                                                   MDS_INODELOCK_UPDATE, NULL);
+                                                 MDS_INODELOCK_UPDATE, NULL);
         }
         if (rc)
                 GOTO(cleanup, rc);
index 3cd2175..c1a80c4 100644 (file)
@@ -172,7 +172,7 @@ static int oscc_wait_for_objects(struct osc_creator *oscc, int count)
         spin_unlock(&oscc->oscc_lock);
 
         osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
-                      
+
         return have_objs || ost_full || osc_invalid;
 }