Whamcloud - gitweb
merge b_llpmd into b_devel. the major highlights:
authorphil <phil>
Tue, 9 Sep 2003 03:54:45 +0000 (03:54 +0000)
committerphil <phil>
Tue, 9 Sep 2003 03:54:45 +0000 (03:54 +0000)
- new I/O backend
- new client page cache and llite/lov/osc plumbing
- pre-creation of OST objects
- most of the OBD protocol now revolves around exports, not obd_devices

29 files changed:
lnet/include/linux/kp30.h
lnet/include/lnet/list.h
lnet/libcfs/debug.c
lnet/utils/debug.c
lustre/include/linux/lustre_otree.h [deleted file]
lustre/kernel_patches/patches/ext3-no-write-super.patch
lustre/kernel_patches/series/hp-pnnl-2.4.20
lustre/kernel_patches/series/rh-2.4.20
lustre/ldlm/ldlm_lib.c
lustre/llite/llite_internal.h
lustre/lov/lov_internal.h
lustre/mdc/mdc_internal.h
lustre/mds/mds_internal.h
lustre/mds/mds_lib.c
lustre/mgmt/mgmt_cli.c
lustre/obdclass/otree.c [deleted file]
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io.c
lustre/obdfilter/filter_log.c
lustre/portals/include/linux/kp30.h
lustre/portals/include/portals/list.h
lustre/portals/libcfs/debug.c
lustre/portals/utils/debug.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpc_lib.c
lustre/tests/multiop.c
lustre/tests/replay-dual.sh
lustre/tests/replay-single.sh
lustre/tests/small_write.c

index 750d16c..a2ef568 100644 (file)
@@ -574,7 +574,7 @@ __s32 portals_debug_copy_to_user(char *buf, unsigned long len);
 #endif
 void portals_debug_msg(int subsys, int mask, char *file, const char *fn,
                        const int line, unsigned long stack,
-                       const char *format, ...)
+                       char *format, ...)
         __attribute__ ((format (printf, 7, 8)));
 #else
 void portals_debug_msg(int subsys, int mask, char *file, const char *fn,
index 78a1e2d..57713cb 100644 (file)
@@ -16,6 +16,8 @@ struct list_head {
        struct list_head *next, *prev;
 };
 
+typedef struct list_head list_t;
+
 #define LIST_HEAD_INIT(name) { &(name), &(name) }
 
 #define LIST_HEAD(name) \
index e9b0e12..16ef401 100644 (file)
@@ -636,7 +636,7 @@ int portals_debug_mark_buffer(char *text)
                 return -EINVAL;
 
         CDEBUG(0, "********************************************************\n");
-        CDEBUG(0, "DEBUG MARKER: %s\n", text);
+        CERROR("DEBUG MARKER: %s\n", text);
         CDEBUG(0, "********************************************************\n");
 
         return 0;
@@ -644,39 +644,104 @@ int portals_debug_mark_buffer(char *text)
 #undef DEBUG_SUBSYSTEM
 #define DEBUG_SUBSYSTEM S_PORTALS
 
+/* this copies a snapshot of the debug buffer into an array of pages
+ * before doing the potentially blocking copy into userspace. it could
+ * be warning userspace if things wrap heavily while its off copying. */
 __s32 portals_debug_copy_to_user(char *buf, unsigned long len)
 {
         int rc;
-        unsigned long debug_off;
+        unsigned long debug_off, i, off, copied;
         unsigned long flags;
+        struct page *page;
+        LIST_HEAD(my_pages);
+        struct list_head *pos, *n;
 
         if (len < debug_size)
                 return -ENOSPC;
 
-        debug_off = atomic_read(&debug_off_a);
+        for (i = 0 ; i < debug_size; i += PAGE_SIZE) {
+                page = alloc_page(GFP_NOFS);
+                if (page == NULL) {
+                        rc = -ENOMEM;
+                        goto cleanup;
+                }
+                list_add(&page->list, &my_pages);
+        }
+        
         spin_lock_irqsave(&portals_debug_lock, flags);
-        if (debug_wrapped) {
-                /* All of this juggling with the 1s is to keep the trailing nul
-                 * (which falls at debug_buf + debug_off) at the end of what we
-                 * copy into user space */
-                copy_to_user(buf, debug_buf + debug_off + 1,
-                             debug_size - debug_off - 1);
-                copy_to_user(buf + debug_size - debug_off - 1,
-                             debug_buf, debug_off + 1);
-                rc = debug_size;
-        } else {
-                copy_to_user(buf, debug_buf, debug_off);
-                rc = debug_off;
+        debug_off = atomic_read(&debug_off_a);
+        
+        /* Sigh. If the buffer is empty, then skip to the end. */
+        if (debug_off == 0 && !debug_wrapped) {
+                spin_unlock_irqrestore(&portals_debug_lock, flags);
+                rc = 0;
+                goto cleanup;
         }
+
+        if (debug_wrapped)
+                off = debug_off + 1;
+        else 
+                off = 0;
+        copied = 0;
+        list_for_each(pos, &my_pages) {
+                unsigned long to_copy;
+                page = list_entry(pos, struct page, list);
+
+                to_copy = min(debug_size - off, PAGE_SIZE);
+                if (to_copy == 0) {
+                        off = 0;
+                        to_copy = min(debug_size - off, PAGE_SIZE);
+                }
+finish_partial:
+                memcpy(kmap(page), debug_buf + off, to_copy);
+                kunmap(page);
+                copied += to_copy;
+                if (copied >= (debug_wrapped ? debug_size : debug_off))
+                        break;
+                        
+                off += to_copy;
+                if (off >= debug_size) {
+                        off = 0;
+                        if (to_copy != PAGE_SIZE) {
+                                to_copy = PAGE_SIZE - to_copy;
+                                goto finish_partial;
+                        }
+                }
+        }
+
         spin_unlock_irqrestore(&portals_debug_lock, flags);
 
+        off = 0;
+        list_for_each(pos, &my_pages) {
+                unsigned long to_copy;
+                page = list_entry(pos, struct page, list);
+
+                to_copy = min(copied - off, PAGE_SIZE);
+                rc = copy_to_user(buf + off, kmap(page), to_copy);
+                kunmap(page);
+                if (rc) {
+                        rc = -EFAULT;
+                        goto cleanup;
+                }
+                off += to_copy;
+                if (off >= copied)
+                        break;
+        }
+        rc = copied;
+        
+cleanup:
+        list_for_each_safe(pos, n, &my_pages) {
+                page = list_entry(pos, struct page, list);
+                list_del(&page->list);
+                __free_page(page);
+        }
         return rc;
 }
 
 /* FIXME: I'm not very smart; someone smarter should make this better. */
 void
 portals_debug_msg(int subsys, int mask, char *file, const char *fn,
-                  const int line, unsigned long stack, const char *format, ...)
+                  const int line, unsigned long stack, char *format, ...)
 {
         va_list       ap;
         unsigned long flags;
@@ -731,33 +796,34 @@ portals_debug_msg(int subsys, int mask, char *file, const char *fn,
         do_gettimeofday(&tv);
 
         prefix_nob = snprintf(debug_buf + debug_off, max_nob,
-                              "%06x:%06x:%d:%lu.%06lu ",
+                              "%06x:%06x:%d:%lu.%06lu :",
                               subsys, mask, smp_processor_id(),
                               tv.tv_sec, tv.tv_usec);
         max_nob -= prefix_nob;
+        if(*(format + strlen(format) - 1) == '\n')
+                *(format + strlen(format) - 1) = ':';
+           
+        va_start(ap, format);
+        msg_nob = vsnprintf(debug_buf + debug_off + prefix_nob ,
+                            max_nob, format, ap);
+        max_nob -= msg_nob;
+        va_end(ap);
 
 #if defined(__arch_um__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,4,20))
-        msg_nob = snprintf(debug_buf + debug_off + prefix_nob, max_nob,
-                           "(%s:%d:%s() %d | %d+%lu)",
+        msg_nob += snprintf(debug_buf + debug_off + prefix_nob + msg_nob, max_nob,
+                           "(%s:%d:%s() %d | %d+%lu)\n",
                            file, line, fn, current->pid,
                            current->thread.extern_pid, stack);
 #elif defined(__arch_um__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        msg_nob = snprintf(debug_buf + debug_off + prefix_nob, max_nob,
-                           "(%s:%d:%s() %d | %d+%lu)",
+        msg_nob += snprintf(debug_buf + debug_off + prefix_nob + msg_nob, max_nob,
+                           "(%s:%d:%s() %d | %d+%lu)\n",
                            file, line, fn, current->pid,
                            current->thread.mode.tt.extern_pid, stack);
 #else
-        msg_nob = snprintf(debug_buf + debug_off + prefix_nob, max_nob,
-                           "(%s:%d:%s() %d+%lu)",
+        msg_nob += snprintf(debug_buf + debug_off + prefix_nob + msg_nob, max_nob,
+                           "(%s:%d:%s() %d+%lu)\n",
                            file, line, fn, current->pid, stack);
 #endif
-        max_nob -= msg_nob;
-
-        va_start(ap, format);
-        msg_nob += vsnprintf(debug_buf + debug_off + prefix_nob + msg_nob,
-                             max_nob, format, ap);
-        max_nob -= msg_nob;
-        va_end(ap);
 
         /* Print to console, while msg is contiguous in debug_buf */
         /* NB safely terminated see above */
index 0a009d2..5921259 100644 (file)
@@ -56,6 +56,8 @@ static int max = 8192;
 static int subsystem_mask = ~0;
 static int debug_mask = ~0;
 
+#define MAX_MARK_SIZE 100
+
 static const char *portal_debug_subsystems[] =
         {"undefined", "mdc", "mds", "osc", "ost", "class", "log", "llite",
          "rpc", "mgmt", "portals", "socknal", "qswnal", "pinger", "filter",
@@ -480,22 +482,29 @@ int jt_dbg_clear_debug_buf(int argc, char **argv)
 
 int jt_dbg_mark_debug_buf(int argc, char **argv)
 {
-        int rc;
+        int rc, max_size = MAX_MARK_SIZE-1;
         struct portal_ioctl_data data;
         char *text;
         time_t now = time(NULL);
 
-        if (argc > 2) {
-                fprintf(stderr, "usage: %s [marker text]\n", argv[0]);
-                return 0;
-        }
-
-        if (argc == 2) {
-                text = argv[1];
+        if (argc > 1) {
+                int counter;
+                text = malloc(MAX_MARK_SIZE);
+                strncpy(text, argv[1], max_size);
+                max_size-=strlen(argv[1]);
+                for(counter = 2; (counter < argc) && (max_size > 0) ; counter++){
+                        strncat(text, " ", 1);
+                        max_size-=1;
+                        strncat(text, argv[counter], max_size);
+                        max_size-=strlen(argv[counter]);
+                }
         } else {
                 text = ctime(&now);
                 text[strlen(text) - 1] = '\0'; /* stupid \n */
         }
+        if (!max_size) {
+                text[MAX_MARK_SIZE - 1] = '\0';
+        }
 
         memset(&data, 0, sizeof(data));
         data.ioc_inllen1 = strlen(text) + 1;
diff --git a/lustre/include/linux/lustre_otree.h b/lustre/include/linux/lustre_otree.h
deleted file mode 100644 (file)
index 3d8d510..0000000
+++ /dev/null
@@ -1,31 +0,0 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- */
-#ifndef _LUSTRE_OTREE_H
-#define _LUSTRE_OTREE_H
-
-/* XXX ok, I can't make sense of our header nest right now.. */
-#ifdef __KERNEL__
-#include <linux/rbtree.h>
-#include <linux/spinlock.h>
-
-struct otree {
-        rb_root_t       ot_root;
-        spinlock_t      ot_lock;
-        unsigned long   ot_num_marked;
-};
-#else
-struct otree {
-        unsigned long   lalala;
-};
-#endif
-
-int ot_mark_offset(struct otree *ot, unsigned long offset);
-int ot_clear_extent(struct otree *ot, unsigned long start, unsigned long end);
-int ot_find_marked_extent(struct otree *ot, unsigned long *start,
-                          unsigned long *end);
-int ot_last_marked(struct otree *ot, unsigned long *last);
-unsigned long ot_num_marked(struct otree *ot);
-void ot_init(struct otree *ot);
-
-#endif
index b096276..d2dcdae 100644 (file)
@@ -1,16 +1,22 @@
+ 0 files changed
 
---- linux-2.4.18/fs/ext3/super.c~ext3-no-write-super   Mon Jul 28 14:13:05 2003
-+++ linux-2.4.18-alexey/fs/ext3/super.c        Mon Jul 28 16:14:11 2003
-@@ -1818,7 +1818,10 @@ void ext3_write_super (struct super_bloc
+--- linux-2.4.20/fs/ext3/super.c~ext3-no-write-super   2003-08-11 13:20:17.000000000 +0400
++++ linux-2.4.20-alexey/fs/ext3/super.c        2003-08-11 13:31:35.000000000 +0400
+@@ -1849,7 +1849,6 @@ void ext3_write_super (struct super_bloc
        if (down_trylock(&sb->s_lock) == 0)
-               BUG();
+               BUG();          /* aviro detector */
        sb->s_dirt = 0;
-+#if 0
-+      /* we really don't need this, jbd makes periodical commits by itself */
-       log_start_commit(EXT3_SB(sb)->s_journal, NULL);
-+#endif
- }
+-      target = log_start_commit(EXT3_SB(sb)->s_journal, NULL);
  
- static int ext3_sync_fs(struct super_block *sb)
+       /*
+        * Tricky --- if we are unmounting, the write really does need
+@@ -1857,6 +1856,7 @@ void ext3_write_super (struct super_bloc
+        * sb->s_root.
+        */
+       if (do_sync_supers || !sb->s_root) {
++              target = log_start_commit(EXT3_SB(sb)->s_journal, NULL);
+               unlock_super(sb);
+               log_wait_commit(EXT3_SB(sb)->s_journal, target);
+               lock_super(sb);
 
 _
index e43b096..cf6f36a 100644 (file)
@@ -25,6 +25,9 @@ ext3-map_inode_page.patch
 ext3-error-export.patch
 iopen-2.4.20.patch
 tcp-zero-copy.patch
+add_page_private.patch
+socket-exports-vanilla.patch
+removepage-2.4.20.patch
 jbd-ctx_switch.patch
 jbd-flushtime.patch
 jbd-get_write_access.patch
index 970061d..519d8e7 100644 (file)
@@ -22,5 +22,13 @@ ext3-san-2.4.20.patch
 ext3-map_inode_page.patch
 ext3-error-export.patch
 iopen-2.4.20.patch
+jbd-dont-account-blocks-twice.patch
+jbd-commit-tricks.patch
+ext3-o_direct-1.2.4.20-rh.patch 
+ext3-no-write-super-chaos.patch
+dynamic-locks-2.4.20-rh.patch 
+vfs-pdirops-2.4.20-rh.patch 
+ext3-pdirops-2.4.18-chaos.patch
 tcp_zero_copy_2.4.20_chaos.patch
 gpl_header-chaos-2.4.20.patch
+add_page_private.patch
index 5c6b620..8520ece 100644 (file)
@@ -52,6 +52,7 @@ int client_connect_import(struct lustre_handle *dlm_handle,
         cli->cl_conn_count++;
         if (cli->cl_conn_count > 1)
                 GOTO(out_sem, rc);
+        exp = class_conn2export(dlm_handle);
 
         if (obd->obd_namespace != NULL)
                 CERROR("already have namespace!\n");
@@ -71,9 +72,7 @@ int client_connect_import(struct lustre_handle *dlm_handle,
 
         LASSERT (imp->imp_state == LUSTRE_IMP_FULL);
 
-        exp = class_conn2export(dlm_handle);
         exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection);
-        class_export_put(exp);
 
         if (imp->imp_replayable) {
                 CDEBUG(D_HA, "connected to replayable target: %s\n",
@@ -92,24 +91,27 @@ out_ldlm:
                 obd->obd_namespace = NULL;
 out_disco:
                 cli->cl_conn_count--;
-                class_disconnect(dlm_handle, 0);
+                class_disconnect(exp, 0);
+        } else {
+                class_export_put(exp);
         }
 out_sem:
         up(&cli->cl_sem);
         return rc;
 }
 
-int client_disconnect_import(struct lustre_handle *dlm_handle, int failover)
+int client_disconnect_export(struct obd_export *exp, int failover)
 {
-        struct obd_device *obd = class_conn2obd(dlm_handle);
+        struct obd_device *obd = class_exp2obd(exp);
         struct client_obd *cli = &obd->u.cli;
         struct obd_import *imp = cli->cl_import;
         int rc = 0, err;
         ENTRY;
 
         if (!obd) {
-                CERROR("invalid connection for disconnect: cookie "LPX64"\n",
-                       dlm_handle ? dlm_handle->cookie : -1UL);
+                CERROR("invalid export for disconnect: "
+                       "exp %p cookie "LPX64"\n", exp, 
+                       exp ? exp->exp_handle.h_cookie : -1UL);
                 RETURN(-EINVAL);
         }
 
@@ -136,19 +138,16 @@ int client_disconnect_import(struct lustre_handle *dlm_handle, int failover)
         }
 
         /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
-        if (obd->obd_no_recov) {
+        if (obd->obd_no_recov)
                 ptlrpc_set_import_active(imp, 0);
-        } else {
+        else
                 rc = ptlrpc_disconnect_import(imp);
-        }
-        
-        imp->imp_state = LUSTRE_IMP_NEW;
 
+        imp->imp_state = LUSTRE_IMP_NEW;
 
         EXIT;
-
  out_no_disconnect:
-        err = class_disconnect(dlm_handle, 0);
+        err = class_disconnect(exp, 0);
         if (!rc && err)
                 rc = err;
  out_sem:
@@ -353,7 +352,7 @@ out:
 
 int target_handle_disconnect(struct ptlrpc_request *req)
 {
-        struct lustre_handle *conn = &req->rq_reqmsg->handle;
+        struct obd_export *export;
         struct obd_import *dlmimp;
         int rc;
         ENTRY;
@@ -362,7 +361,10 @@ int target_handle_disconnect(struct ptlrpc_request *req)
         if (rc)
                 RETURN(rc);
 
-        req->rq_status = obd_disconnect(conn, 0);
+        /* Create an export reference to disconnect, so the rq_export
+         * ref is not destroyed. See class_disconnect() for more info. */
+        export = class_export_get(req->rq_export);
+        req->rq_status = obd_disconnect(export, 0);
 
         dlmimp = req->rq_export->exp_ldlm_data.led_import;
         class_destroy_import(dlmimp);
@@ -437,9 +439,16 @@ void target_abort_recovery(void *data)
 
         obd->obd_recovering = obd->obd_abort_recovery = 0;
         obd->obd_recoverable_clients = 0;
+
         wake_up(&obd->obd_next_transno_waitq);
         target_cancel_recovery_timer(obd);
         spin_unlock_bh(&obd->obd_processing_task_lock);
+
+        /* XXX can't call this with spin_lock_bh, but it probably
+           should be protected, somehow. */
+        if (OBT(obd) && OBP(obd, postsetup))
+                OBP(obd, postsetup)(obd);
+
         class_disconnect_exports(obd, 0);
         abort_delayed_replies(obd);
         abort_recovery_queue(obd);
@@ -493,12 +502,16 @@ static int check_for_next_transno(struct obd_device *obd)
         struct ptlrpc_request *req;
         int wake_up;
 
+        /* XXX shouldn't we take obd->obd_processing_task_lock to check these
+           flags and the recovery_queue? */
+        if (obd->obd_abort_recovery || !obd->obd_recovering)
+                return 1;
+
         req = list_entry(obd->obd_recovery_queue.next,
                          struct ptlrpc_request, rq_list);
         LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
 
-        wake_up = req->rq_reqmsg->transno == obd->obd_next_recovery_transno ||
-                (obd->obd_recovering) == 0;
+        wake_up = req->rq_reqmsg->transno == obd->obd_next_recovery_transno;
         CDEBUG(D_HA, "check_for_next_transno: "LPD64" vs "LPD64", %d == %d\n",
                req->rq_reqmsg->transno, obd->obd_next_recovery_transno,
                obd->obd_recovering, wake_up);
@@ -692,6 +705,10 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
                 CERROR("%s: all clients recovered, sending delayed replies\n",
                        obd->obd_name);
                 obd->obd_recovering = 0;
+
+                if (OBT(obd) && OBP(obd, postsetup))
+                        OBP(obd, postsetup)(obd);
+
                 list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
                         DEBUG_REQ(D_ERROR, req, "delayed:");
@@ -778,9 +795,16 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
         wait_queue_t commit_wait;
         struct obd_device *obd =
                 req->rq_export ? req->rq_export->exp_obd : NULL;
-        struct obd_export *exp =
-                (req->rq_export && req->rq_ack_locks[0].mode) ?
-                req->rq_export : NULL;
+        struct obd_export *exp = NULL;
+
+        if (req->rq_export) {
+                for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) {
+                        if (req->rq_ack_locks[i].mode) {
+                                exp = req->rq_export;
+                                break;
+                        }
+                }
+        }
 
         if (exp) {
                 exp->exp_outstanding_reply = req;
@@ -848,9 +872,10 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
 
         exp->exp_outstanding_reply = NULL;
 
-        for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) {
+        for (ack_lock = req->rq_ack_locks, i = 0;
+             i < REQ_MAX_ACK_LOCKS; i++, ack_lock++) {
                 if (!ack_lock->mode)
-                        break;
+                        continue;
                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
         }
 }
@@ -859,3 +884,21 @@ int target_handle_ping(struct ptlrpc_request *req)
 {
         return lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
 }
+
+void *ldlm_put_lock_into_req(struct ptlrpc_request *req,
+                                struct lustre_handle *lock, int mode)
+{
+        int i;
+
+        for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) {
+                if (req->rq_ack_locks[i].mode)
+                        continue;
+                memcpy(&req->rq_ack_locks[i].lock, lock, sizeof(*lock));
+                req->rq_ack_locks[i].mode = mode;
+                return &req->rq_ack_locks[i];
+        }
+        CERROR("no space for lock in struct ptlrpc_request\n");
+        LBUG();
+        return NULL;
+}
+
index 848b77e..cb68b46 100644 (file)
@@ -33,105 +33,101 @@ static inline struct inode *ll_info2i(struct ll_inode_info *lli)
 #endif
 }
 
-/* llite/commit_callback.c */
-int ll_commitcbd_setup(struct ll_sb_info *);
-int ll_commitcbd_cleanup(struct ll_sb_info *);
+static inline void ll_i2uctxt(struct ll_uctxt *ctxt, struct inode *i1,
+                              struct inode *i2)
+{
+
+        LASSERT(i1);
+        LASSERT(ctxt);
+
+        if (in_group_p(i1->i_gid))
+                ctxt->gid1 = i1->i_gid;
+        else
+                ctxt->gid1 = -1;
+
+        if (i2) {
+                if (in_group_p(i2->i_gid))
+                        ctxt->gid2 = i2->i_gid;
+                else
+                        ctxt->gid2 = -1;
+        } else 
+                ctxt->gid2 = 0;
+}
+
+struct it_cb_data {
+       struct inode *icbd_parent;
+       struct dentry **icbd_childp;
+       obd_id hash;
+};
 
-/* lproc_llite.c */
+/* llite/lproc_llite.c */
 int lprocfs_register_mountpoint(struct proc_dir_entry *parent,
                                 struct super_block *sb, char *osc, char *mdc);
 void lprocfs_unregister_mountpoint(struct ll_sb_info *sbi);
 
+/* llite/dir.c */
+extern struct file_operations ll_dir_operations;
+extern struct inode_operations ll_dir_inode_operations;
+
 /* llite/namei.c */
+int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir);
 struct inode *ll_iget(struct super_block *sb, ino_t hash,
                       struct lustre_md *lic);
 struct dentry *ll_find_alias(struct inode *, struct dentry *);
-int ll_it_open_error(int phase, struct lookup_intent *it);
-int ll_mdc_cancel_unused(struct lustre_handle *conn, struct inode *inode,
-                         int flags, void *opaque);
+int ll_mdc_cancel_unused(struct lustre_handle *, struct inode *, int flags,
+                         void *opaque);
+int ll_mdc_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
+                        void *data, int flag);
+void ll_prepare_mdc_op_data(struct mdc_op_data *,
+                            struct inode *i1, struct inode *i2,
+                            const char *name, int namelen, int mode);
 
 /* llite/rw.c */
-void ll_end_writeback(struct inode *, struct page *);
-
-void ll_remove_dirty(struct inode *inode, unsigned long start,
-                     unsigned long end);
-int ll_rd_dirty_pages(char *page, char **start, off_t off, int count,
-                      int *eof, void *data);
-int ll_rd_max_dirty_pages(char *page, char **start, off_t off, int count,
-                          int *eof, void *data);
-int ll_wr_max_dirty_pages(struct file *file, const char *buffer,
-                          unsigned long count, void *data);
-int ll_clear_dirty_pages(struct lustre_handle *conn, struct lov_stripe_md *lsm,
-                         unsigned long start, unsigned long end);
-int ll_mark_dirty_page(struct lustre_handle *conn, struct lov_stripe_md *lsm,
-                       unsigned long index);
-
-/* llite/file.c */
-extern int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *);
-
-/* llite/super.c */
-int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc);
-int ll_setattr(struct dentry *de, struct iattr *attr);
-
-/* iod.c */
-#define IO_STAT_ADD(FIS, STAT, VAL) do {        \
-        struct file_io_stats *_fis_ = (FIS);    \
-        spin_lock(&_fis_->fis_lock);            \
-        _fis_->fis_##STAT += VAL;               \
-        spin_unlock(&_fis_->fis_lock);          \
-} while (0)
-
-#define INODE_IO_STAT_ADD(INODE, STAT, VAL)        \
-        IO_STAT_ADD(&ll_i2sbi(INODE)->ll_iostats, STAT, VAL)
-
-#define PAGE_IO_STAT_ADD(PAGE, STAT, VAL)               \
-        INODE_IO_STAT_ADD((PAGE)->mapping, STAT, VAL)
-
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
-/* XXX lliod needs more work in 2.5 before being proven and brought back
- * to 2.4, it'll at least require a patch to introduce page->private */
-int lliod_start(struct ll_sb_info *sbi, struct inode *inode);
-void lliod_stop(struct ll_sb_info *sbi);
-#else
-#define lliod_start(sbi, inode) ({int _ret = 0; (void)sbi, (void)inode; _ret;})
-#define lliod_stop(sbi) do { (void)sbi; } while (0)
+int ll_prepare_write(struct file *file, struct page *page, unsigned from,
+                            unsigned to);
+int ll_commit_write(struct file *file, struct page *page, unsigned from,
+                    unsigned to);
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+#define ll_complete_writeback ll_complete_writepage_24
+void ll_complete_writepage_24(struct obd_client_page *ocp, int rc);
+#else 
+#define ll_complete_writeback ll_complete_writepage_26
+void ll_complete_writepage_26(struct obd_client_page *ocp, int rc);
 #endif
-void lliod_wakeup(struct inode *inode);
-void lliod_give_plist(struct inode *inode, struct plist *plist, int rw);
-void lliod_give_page(struct inode *inode, struct page *page, int rw);
-void plist_init(struct plist *plist); /* for lli initialization.. */
-
-void ll_lldo_init(struct ll_dirty_offsets *lldo);
-void ll_record_dirty(struct inode *inode, unsigned long offset);
-void ll_remove_dirty(struct inode *inode, unsigned long start,
-                     unsigned long end);
-int ll_find_dirty(struct ll_dirty_offsets *lldo, unsigned long *start,
-                  unsigned long *end);
-int ll_farthest_dirty(struct ll_dirty_offsets *lldo, unsigned long *farthest);
+int ll_sync_page(struct page *page);
+int ll_ocp_update_obdo(struct obd_client_page *ocp, int cmd, struct obdo *oa);
+void ll_removepage(struct page *page);
+int ll_readpage(struct file *file, struct page *page);
 
+void ll_truncate(struct inode *inode);
 
-/* llite/super25.c */
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
+/* llite/file.c */
+extern struct file_operations ll_file_operations;
+extern struct inode_operations ll_file_inode_operations;
+extern struct inode_operations ll_special_inode_operations;
+extern int ll_inode_revalidate_it(struct dentry *, struct lookup_intent *);
+int ll_extent_lock(struct ll_file_data *, struct inode *,
+                   struct lov_stripe_md *, int mode, struct ldlm_extent *,
+                   struct lustre_handle *);
+int ll_extent_unlock(struct ll_file_data *, struct inode *,
+                     struct lov_stripe_md *, int mode, struct lustre_handle *);
+int ll_file_open(struct inode *inode, struct file *file);
+int ll_file_release(struct inode *inode, struct file *file);
+int ll_extent_lock_no_validate(struct ll_file_data *, struct inode *,
+                               struct lov_stripe_md *, int mode,
+                               struct ldlm_extent *, struct lustre_handle *,
+                               int ast_flags);
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
 int ll_getattr(struct vfsmount *mnt, struct dentry *de,
-               struct lookup_intent *it, 
-               struct kstat *stat);
+               struct lookup_intent *it, struct kstat *stat);
 #endif
 
-
 /* llite/dcache.c */
 void ll_intent_release(struct lookup_intent *);
 extern void ll_set_dd(struct dentry *de);
 void ll_unhash_aliases(struct inode *);
-
-/* llite/rw.c */
-void ll_truncate(struct inode *inode);
-void ll_end_writeback(struct inode *inode, struct page *page);
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-int ll_check_dirty(struct super_block *sb);
-int ll_batch_writepage(struct inode *inode, struct obdo *oa, struct page *page);
-#else
-#define ll_check_dirty(SB) do { (void)SB; } while (0)
-#endif
+void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft);
+void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
 
 /* llite/llite_lib.c */
 
@@ -146,10 +142,11 @@ void ll_put_super(struct super_block *sb);
 struct inode *ll_inode_from_lock(struct ldlm_lock *lock);
 void ll_clear_inode(struct inode *inode);
 int ll_attr2inode(struct inode *inode, struct iattr *attr, int trunc);
-int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc);
 int ll_setattr_raw(struct inode *inode, struct iattr *attr);
 int ll_setattr(struct dentry *de, struct iattr *attr);
 int ll_statfs(struct super_block *sb, struct kstatfs *sfs);
+int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs,
+                       unsigned long maxage);
 void ll_update_inode(struct inode *inode, struct mds_body *body,
                      struct lov_stripe_md *lsm);
 int it_disposition(struct lookup_intent *it, int flag);
@@ -157,4 +154,7 @@ void it_set_disposition(struct lookup_intent *it, int flag);
 void ll_read_inode2(struct inode *inode, void *opaque);
 void ll_umount_begin(struct super_block *sb);
 
+/* llite/symlink.c */
+extern struct inode_operations ll_fast_symlink_inode_operations;
+
 #endif /* LLITE_INTERNAL_H */
index f3bc191..f9b629e 100644 (file)
@@ -13,13 +13,13 @@ int lov_alloc_memmd(struct lov_stripe_md **lsmp, int stripe_count);
 void lov_free_memmd(struct lov_stripe_md **lsmp);
 
 /* lov_pack.c */
-int lov_packmd(struct lustre_handle *conn, struct lov_mds_md **lmm,
+int lov_packmd(struct obd_export *exp, struct lov_mds_md **lmm,
                struct lov_stripe_md *lsm);
-int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsm,
+int lov_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsm,
                  struct lov_mds_md *lmm, int lmmsize);
-int lov_setstripe(struct lustre_handle *conn,
+int lov_setstripe(struct obd_export *exp,
                   struct lov_stripe_md **lsmp, struct lov_mds_md *lmmu);
-int lov_getstripe(struct lustre_handle *conn,
+int lov_getstripe(struct obd_export *exp,
                   struct lov_stripe_md *lsm, struct lov_mds_md *lmmu);
 
 /* lproc_lov.c */
index 0ac87a4..6d212fd 100644 (file)
@@ -23,3 +23,47 @@ void mdc_link_pack(struct ptlrpc_request *req, int offset,
 void mdc_rename_pack(struct ptlrpc_request *req, int offset,
                      struct mdc_op_data *data,
                      const char *old, int oldlen, const char *new, int newlen);
+
+struct mdc_open_data {
+        struct obd_client_handle *mod_och;
+        struct ptlrpc_request    *mod_close_req;
+};
+
+struct mdc_rpc_lock {
+        struct semaphore rpcl_sem;
+        struct lookup_intent *rpcl_it;
+};
+
+static inline void mdc_init_rpc_lock(struct mdc_rpc_lock *lck)
+{
+        sema_init(&lck->rpcl_sem, 1);
+        lck->rpcl_it = NULL;
+}
+
+#ifdef __KERNEL__
+static inline void mdc_get_rpc_lock(struct mdc_rpc_lock *lck, 
+                                    struct lookup_intent *it)
+{
+        ENTRY;
+        down(&lck->rpcl_sem);
+        if (it) { 
+                lck->rpcl_it = it;
+        }
+}
+
+static inline void mdc_put_rpc_lock(struct mdc_rpc_lock *lck, 
+                                    struct lookup_intent *it)
+{
+        EXIT;
+        if (it == NULL) {
+                LASSERT(it == lck->rpcl_it);
+                up(&lck->rpcl_sem);
+                return;
+        }
+        if (it) {
+                LASSERT(it == lck->rpcl_it);
+                lck->rpcl_it = NULL;
+                up(&lck->rpcl_sem);
+        }
+}
+#endif
index 157d6a9..3950b05 100644 (file)
@@ -1,3 +1,7 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+
 #ifndef _MDS_INTERNAL_H
 #define _MDS_INTERNAL_H
 static inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
@@ -5,13 +9,14 @@ static inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
         return &req->rq_export->exp_obd->u.mds;
 }
 
+
 /* mds/mds_fs.c */
 struct llog_handle *mds_log_create(struct obd_device *obd);
 int mds_log_close(struct llog_handle *cathandle, struct llog_handle *loghandle);
 struct llog_handle *mds_log_open(struct obd_device *obd,
                                  struct llog_cookie *logcookie);
 struct llog_handle *mds_get_catalog(struct obd_device *obd);
-void mds_put_catalog(struct llog_handle *cathandle);
+void mds_put_catalog(struct obd_device *obd, struct llog_handle *cathandle);
 
 
 /* mds/mds_reint.c */
@@ -26,22 +31,34 @@ int mds_update_unpack(struct ptlrpc_request *, int offset,
                       struct mds_update_record *);
 
 /* mds/mds_lov.c */
+int mds_lov_connect(struct obd_device *obd);
 int mds_get_lovtgts(struct mds_obd *mds, int tgt_count,
                     struct obd_uuid *uuidarray);
+int mds_lov_write_objids(struct obd_device *obd);
+void mds_lov_update_objids(struct obd_device *obd, obd_id *ids);
+int mds_lov_set_growth(struct mds_obd *mds, int count);
+int mds_lov_set_nextid(struct obd_device *obd);
 
 /* mds/mds_open.c */
+int mds_query_write_access(struct inode *inode);
 int mds_open(struct mds_update_record *rec, int offset,
              struct ptlrpc_request *req, struct lustre_handle *);
 int mds_pin(struct ptlrpc_request *req);
 int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
-                 struct mds_file_data *mfd, int unlink_orphan);
+                  struct mds_file_data *mfd, int unlink_orphan);
 int mds_close(struct ptlrpc_request *req);
 
 
 /* mds/mds_fs.c */
 int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
-                  struct mds_export_data *med, int cl_off);
+                   struct mds_export_data *med, int cl_off);
 int mds_client_free(struct obd_export *exp, int clear_client);
+int mds_object_create(struct obd_export *exp, struct obdo *oa,
+                      struct lov_stripe_md **ea, struct obd_trans_info *oti);
+
+/* mds/handler.c */
+extern int mds_iocontrol(unsigned int cmd, struct obd_export *exp,
+                         int len, void *karg, void *uarg);
 
 #ifdef __KERNEL__
 void mds_pack_inode2fid(struct ll_fid *fid, struct inode *inode);
index cc03390..a9b6f3f 100644 (file)
@@ -59,7 +59,7 @@ void mds_pack_inode2fid(struct ll_fid *fid, struct inode *inode)
 /* Note that we can copy all of the fields, just some will not be "valid" */
 void mds_pack_inode2body(struct mds_body *b, struct inode *inode)
 {
-        b->valid = OBD_MD_FLID | OBD_MD_FLCTIME | OBD_MD_FLUID | OBD_MD_FLGID |
+        b->valid |= OBD_MD_FLID | OBD_MD_FLCTIME | OBD_MD_FLUID | OBD_MD_FLGID |
                 OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
 
         if (!S_ISREG(inode->i_mode))
@@ -266,6 +266,45 @@ static int mds_rename_unpack(struct ptlrpc_request *req, int offset,
         RETURN(0);
 }
 
+static int mds_open_unpack(struct ptlrpc_request *req, int offset,
+                           struct mds_update_record *r)
+{
+        struct mds_rec_create *rec;
+        ENTRY;
+
+        rec = lustre_swab_reqbuf (req, offset, sizeof (*rec),
+                                  lustre_swab_mds_rec_create);
+        if (rec == NULL)
+                RETURN (-EFAULT);
+
+        r->ur_fsuid = rec->cr_fsuid;
+        r->ur_fsgid = rec->cr_fsgid;
+        r->ur_cap = rec->cr_cap;
+        r->ur_fid1 = &rec->cr_fid;
+        r->ur_fid2 = &rec->cr_replayfid;
+        r->ur_mode = rec->cr_mode;
+        r->ur_rdev = rec->cr_rdev;
+        r->ur_time = rec->cr_time;
+        r->ur_flags = rec->cr_flags;
+        r->ur_suppgid1 = rec->cr_suppgid;
+        r->ur_suppgid2 = -1;
+
+        LASSERT_REQSWAB (req, offset + 1);
+        r->ur_name = lustre_msg_string (req->rq_reqmsg, offset + 1, 0);
+        if (r->ur_name == NULL)
+                RETURN (-EFAULT);
+        r->ur_namelen = req->rq_reqmsg->buflens[offset + 1];
+
+        LASSERT_REQSWAB (req, offset + 2);
+        if (req->rq_reqmsg->bufcount > offset + 2) {
+                r->ur_eadata = lustre_msg_buf(req->rq_reqmsg, offset + 2, 0);
+                if (r->ur_eadata == NULL)
+                        RETURN (-EFAULT);
+                r->ur_eadatalen = req->rq_reqmsg->buflens[offset + 2];
+        }
+        RETURN(0);
+}
+
 typedef int (*update_unpacker)(struct ptlrpc_request *req, int offset,
                                struct mds_update_record *r);
 
@@ -275,7 +314,7 @@ static update_unpacker mds_unpackers[REINT_MAX + 1] = {
         [REINT_LINK] mds_link_unpack,
         [REINT_UNLINK] mds_unlink_unpack,
         [REINT_RENAME] mds_rename_unpack,
-        [REINT_OPEN] mds_create_unpack,
+        [REINT_OPEN] mds_open_unpack,
 };
 
 int mds_update_unpack(struct ptlrpc_request *req, int offset,
index 9d4183a..5cbcfae 100644 (file)
@@ -101,20 +101,22 @@ static int mgmtcli_connect_to_svc(struct obd_device *obd)
         struct ptlrpc_svc_data svc_data;
         struct ptlrpc_thread *thread;
         struct l_wait_info lwi = { 0 };
+        struct lustre_handle conn = {0, };
         ENTRY;
 
         /* Connect to ourselves, and thusly to the mgmt service. */
-        rc = client_connect_import(&mc->mc_ping_handle, obd, &obd->obd_uuid);
+        rc = client_connect_import(&conn, obd, &obd->obd_uuid);
         if (rc) {
                 CERROR("failed to connect to mgmt svc: %d\n", rc);
                 (void)client_obd_cleanup(obd, 0);
                 RETURN(rc);
         }
+        mc->mc_ping_exp = class_conn2export(&conn);
         
         LASSERT(mc->mc_ping_thread == NULL);
         OBD_ALLOC(thread, sizeof (*thread));
         if (thread == NULL)
-                RETURN(-ENOMEM);
+                GOTO(out, rc = -ENOMEM);
         mc->mc_ping_thread = thread;
         init_waitqueue_head(&thread->t_ctl_waitq);
 
@@ -122,11 +124,12 @@ static int mgmtcli_connect_to_svc(struct obd_device *obd)
         svc_data.thread = thread;
 
         rc = kernel_thread(mgmtcli_pinger_main, &svc_data, CLONE_VM | CLONE_FILES);
+out:
         if (rc < 0) {
                 CERROR("can't start thread to ping mgmt svc %s: %d\n",
                        mc->mc_import->imp_target_uuid.uuid, rc);
                 OBD_FREE(mc->mc_ping_thread, sizeof (*mc->mc_ping_thread));
-                (void)client_disconnect_import(&mc->mc_ping_handle, 0);
+                (void)client_disconnect_import(mc->mc_ping_exp, 0);
                 RETURN(rc);
         }
         l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
@@ -143,7 +146,7 @@ static int mgmtcli_disconnect_from_svc(struct obd_device *obd)
         int rc;
 
         ENTRY;
-        rc = client_disconnect_import(&mc->mc_ping_handle, 0);
+        rc = client_disconnect_import(mc->mc_ping_exp, 0);
         if (rc) {
                 CERROR("can't disconnect from %s: %d (%s)\n",
                        imp->imp_target_uuid.uuid, rc,
diff --git a/lustre/obdclass/otree.c b/lustre/obdclass/otree.c
deleted file mode 100644 (file)
index 70f3077..0000000
+++ /dev/null
@@ -1,268 +0,0 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
- *
- *   This file is part of Lustre, http://www.lustre.org.
- *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
- *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
- *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- *
- *  Copyright (C) 2002, 2003  Cluster File Systems, Inc
- *
- *  our offset trees (otrees) track single-bit state of offsets in an
- *  extent tree.  
- */
-
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-#include <linux/version.h>
-#include <linux/config.h>
-#include <linux/module.h>
-
-#define DEBUG_SUBSYSTEM S_OSC
-#include <linux/kp30.h>
-#include <linux/obd.h>
-#include <linux/lustre_debug.h>
-#include <linux/lustre_otree.h>
-
-struct offset_extent {
-        rb_node_t       oe_node;
-        unsigned long   oe_start, oe_end;
-};
-
-static struct offset_extent * ot_find_oe(rb_root_t *root,
-                                         struct offset_extent *needle)
-{
-        struct rb_node_s *node = root->rb_node;
-        struct offset_extent *oe;
-        ENTRY;
-
-        CDEBUG(D_INODE, "searching [%lu -> %lu]\n", needle->oe_start,
-               needle->oe_end);
-
-        while (node) {
-                oe = rb_entry(node, struct offset_extent, oe_node);
-                if (needle->oe_end < oe->oe_start)
-                        node = node->rb_left;
-                else if (needle->oe_start > oe->oe_end)
-                        node = node->rb_right;
-                else {
-                        CDEBUG(D_INODE, "returning [%lu -> %lu]\n",
-                               oe->oe_start, oe->oe_end);
-                        RETURN(oe);
-                }
-        }
-        RETURN(NULL);
-}
-
-/* do the rbtree mechanics to insert a node, callers are responsible
- * for making sure that this new node doesn't overlap with existing
- * nodes */
-static void ot_insert_oe(rb_root_t *root, struct offset_extent *new_oe)
-{
-        rb_node_t ** p = &root->rb_node;
-        rb_node_t * parent = NULL;
-        struct offset_extent *oe;
-        ENTRY;
-
-        LASSERT(new_oe->oe_start <= new_oe->oe_end);
-
-        while (*p) {
-                parent = *p;
-                oe = rb_entry(parent, struct offset_extent, oe_node);
-                if ( new_oe->oe_end < oe->oe_start )
-                        p = &(*p)->rb_left;
-                else if ( new_oe->oe_start > oe->oe_end )
-                        p = &(*p)->rb_right;
-                else
-                        LBUG();
-        }
-        rb_link_node(&new_oe->oe_node, parent, p);
-        rb_insert_color(&new_oe->oe_node, root);
-        EXIT;
-}
-
-int ot_mark_offset(struct otree *ot, unsigned long offset)
-{
-        struct offset_extent needle, *oe, *new_oe;
-        int rc = 0;
-        ENTRY;
-
-        OBD_ALLOC(new_oe, sizeof(*new_oe));
-        if (new_oe == NULL)
-                RETURN(-ENOMEM);
-
-        spin_lock(&ot->ot_lock);
-
-        /* find neighbours that we might glom on to */
-        needle.oe_start = (offset > 0) ? offset - 1 : offset;
-        needle.oe_end = (offset < ~0) ? offset + 1 : offset;
-        oe = ot_find_oe(&ot->ot_root, &needle);
-        if ( oe == NULL ) {
-                new_oe->oe_start = offset;
-                new_oe->oe_end = offset;
-                ot_insert_oe(&ot->ot_root, new_oe);
-                ot->ot_num_marked++;
-                new_oe = NULL;
-                GOTO(out, rc);
-        }
-
-        /* already recorded */
-        if ( offset >= oe->oe_start && offset <= oe->oe_end )
-                GOTO(out, rc);
-
-        /* ok, need to check for adjacent neighbours */
-        needle.oe_start = offset;
-        needle.oe_end = offset;
-        if (ot_find_oe(&ot->ot_root, &needle))
-                GOTO(out, rc);
-
-        /* ok, its safe to extend the oe we found */
-        if ( offset == oe->oe_start - 1 )
-                oe->oe_start--;
-        else if ( offset == oe->oe_end + 1 )
-                oe->oe_end++;
-        else
-                LBUG();
-        ot->ot_num_marked++;
-
-out:
-        CDEBUG(D_INODE, "%lu now dirty\n", ot->ot_num_marked);
-        spin_unlock(&ot->ot_lock);
-        if (new_oe)
-                OBD_FREE(new_oe, sizeof(*new_oe));
-        RETURN(rc);
-}
-
-int ot_clear_extent(struct otree *ot, unsigned long start, unsigned long end)
-{
-        struct offset_extent needle, *oe, *new_oe;
-        int rc = 0;
-        ENTRY;
-
-        /* will allocate more intelligently later */
-        OBD_ALLOC(new_oe, sizeof(*new_oe));
-        if (new_oe == NULL)
-                RETURN(-ENOMEM);
-
-        needle.oe_start = start;
-        needle.oe_end = end;
-
-        spin_lock(&ot->ot_lock);
-        for ( ; (oe = ot_find_oe(&ot->ot_root, &needle)) ; ) {
-                rc = 0;
-
-                /* see if we're punching a hole and need to create a node */
-                if (oe->oe_start < start && oe->oe_end > end) {
-                        new_oe->oe_start = end + 1;
-                        new_oe->oe_end = oe->oe_end;
-                        oe->oe_end = start - 1;
-                        ot_insert_oe(&ot->ot_root, new_oe);
-                        new_oe = NULL;
-                        ot->ot_num_marked -= end - start + 1;
-                        break;
-                }
-
-                /* overlapping edges */
-                if (oe->oe_start < start && oe->oe_end <= end) {
-                        ot->ot_num_marked -= oe->oe_end - start + 1;
-                        oe->oe_end = start - 1;
-                        oe = NULL;
-                        continue;
-                }
-                if (oe->oe_end > end && oe->oe_start >= start) {
-                        ot->ot_num_marked -= end - oe->oe_start + 1;
-                        oe->oe_start = end + 1;
-                        oe = NULL;
-                        continue;
-                }
-
-                /* an extent entirely within the one we're clearing */
-                rb_erase(&oe->oe_node, &ot->ot_root);
-                ot->ot_num_marked -= oe->oe_end - oe->oe_start + 1;
-                spin_unlock(&ot->ot_lock);
-                OBD_FREE(oe, sizeof(*oe));
-                spin_lock(&ot->ot_lock);
-        }
-        CDEBUG(D_INODE, "%lu now dirty\n", ot->ot_num_marked);
-        spin_unlock(&ot->ot_lock);
-        if (new_oe)
-                OBD_FREE(new_oe, sizeof(*new_oe));
-        RETURN(rc);
-}
-
-int ot_find_marked_extent(struct otree *ot, unsigned long *start,
-                  unsigned long *end)
-{
-        struct offset_extent needle, *oe;
-        int rc = -ENOENT;
-        ENTRY;
-
-        needle.oe_start = *start;
-        needle.oe_end = *end;
-
-        spin_lock(&ot->ot_lock);
-        oe = ot_find_oe(&ot->ot_root, &needle);
-        if (oe) {
-                *start = oe->oe_start;
-                *end = oe->oe_end;
-                rc = 0;
-        }
-        spin_unlock(&ot->ot_lock);
-
-        RETURN(rc);
-}
-
-int ot_last_marked(struct otree *ot, unsigned long *last)
-{
-        struct rb_node_s *found, *node;
-        struct offset_extent *oe;
-        int rc = -ENOENT;
-        ENTRY;
-
-        spin_lock(&ot->ot_lock);
-        for (node = ot->ot_root.rb_node, found = NULL;
-             node;
-             found = node, node = node->rb_right)
-                ;
-
-        if (found) {
-                oe = rb_entry(found, struct offset_extent, oe_node);
-                *last = oe->oe_end;
-                rc = 0;
-        }
-        spin_unlock(&ot->ot_lock);
-        RETURN(rc);
-}
-
-unsigned long ot_num_marked(struct otree *ot)
-{
-        return ot->ot_num_marked;
-}
-
-void ot_init(struct otree *ot)
-{
-        CDEBUG(D_INODE, "initializing %p\n", ot);
-        spin_lock_init(&ot->ot_lock);
-        ot->ot_num_marked = 0;
-        ot->ot_root.rb_node = NULL;
-}
-
-EXPORT_SYMBOL(ot_mark_offset);
-EXPORT_SYMBOL(ot_clear_extent);
-EXPORT_SYMBOL(ot_find_marked_extent);
-EXPORT_SYMBOL(ot_last_marked);
-EXPORT_SYMBOL(ot_num_marked);
-EXPORT_SYMBOL(ot_init);
index 9f0b5ed..df2fd65 100644 (file)
@@ -16,6 +16,8 @@
 #include <linux/lustre_handles.h>
 #include <linux/obd.h>
 
+#define FILTER_LAYOUT_VERSION "2"
+
 #ifndef OBD_FILTER_DEVICENAME
 # define OBD_FILTER_DEVICENAME "obdfilter"
 #endif
@@ -25,7 +27,7 @@
 #endif
 
 #define LAST_RCVD "last_rcvd"
-#define FILTER_INIT_OBJID 2
+#define FILTER_INIT_OBJID 0
 
 #define FILTER_LR_SERVER_SIZE    512
 
@@ -37,6 +39,7 @@
 #define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
 
 #define FILTER_SUBDIR_COUNT      32            /* set to zero for no subdirs */
+#define FILTER_GROUPS 2 /* must be at least 2; not dynamic yet */
 
 #define FILTER_MOUNT_RECOV 2
 #define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */
@@ -45,7 +48,7 @@
 struct filter_server_data {
         __u8  fsd_uuid[37];        /* server UUID */
         __u8  fsd_uuid_padding[3]; /* unused */
-        __u64 fsd_last_objid;      /* last created object ID */
+        __u64 fsd_unused;
         __u64 fsd_last_transno;    /* last completed transaction ID */
         __u64 fsd_mount_count;     /* FILTER incarnation number */
         __u32 fsd_feature_compat;  /* compatible feature flags */
@@ -72,22 +75,6 @@ struct filter_client_data {
         __u8  fcd_padding[FILTER_LR_CLIENT_SIZE - 64];
 };
 
-/* file data for open files on OST */
-struct filter_file_data {
-        struct portals_handle ffd_handle;
-        atomic_t              ffd_refcount;
-        struct list_head      ffd_export_list; /* export open list - fed_lock */
-        struct file          *ffd_file;         /* file handle */
-};
-
-struct filter_dentry_data {
-        struct llog_cookie      fdd_cookie;
-        obd_id                  fdd_objid;
-        __u32                   fdd_magic;
-        atomic_t                fdd_open_count;
-        int                     fdd_flags;
-};
-
 #define FILTER_DENTRY_MAGIC 0x9efba101
 #define FILTER_FLAG_DESTROY 0x0001      /* destroy dentry on last file close */
 
@@ -103,21 +90,21 @@ enum {
 };
 
 /* filter.c */
-struct dentry *filter_parent(struct obd_device *, obd_mode mode, obd_id objid);
-struct dentry *filter_parent_lock(struct obd_device *, obd_mode mode,
-                                  obd_id objid, ldlm_mode_t lock_mode,
-                                  struct lustre_handle *lockh);
+struct dentry *filter_parent(struct obd_device *, obd_gr group, obd_id objid);
+struct dentry *filter_parent_lock(struct obd_device *, obd_gr, obd_id,
+                                  ldlm_mode_t, struct lustre_handle *);
 void f_dput(struct dentry *);
 struct dentry *filter_fid2dentry(struct obd_device *, struct dentry *dir,
-                                 obd_mode mode, obd_id id);
+                                 obd_gr group, obd_id id);
 struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa,
                                   const char *what);
 #define filter_oa2dentry(obd, oa) __filter_oa2dentry(obd, oa, __FUNCTION__)
 
 int filter_finish_transno(struct obd_export *, struct obd_trans_info *, int rc);
-__u64 filter_next_id(struct filter_obd *);
+__u64 filter_next_id(struct filter_obd *, struct obdo *);
 int filter_update_server_data(struct obd_device *, struct file *,
                               struct filter_server_data *, int force_sync);
+int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync);
 int filter_common_setup(struct obd_device *, obd_count len, void *buf,
                         char *option);
 
@@ -128,12 +115,19 @@ int filter_preprw(int cmd, struct obd_export *, struct obdo *, int objcount,
 int filter_commitrw(int cmd, struct obd_export *, struct obdo *, int objcount,
                     struct obd_ioobj *, int niocount, struct niobuf_local *,
                     struct obd_trans_info *);
-int filter_brw(int cmd, struct lustre_handle *, struct obdo *,
+int filter_brw(int cmd, struct obd_export *, struct obdo *,
               struct lov_stripe_md *, obd_count oa_bufs, struct brw_page *,
               struct obd_trans_info *);
+void flip_into_page_cache(struct inode *inode, struct page *new_page);
+
+/* filter_io_*.c */
+int filter_commitrw_write(struct obd_export *exp, int objcount,
+                          struct obd_ioobj *obj, int niocount,
+                          struct niobuf_local *res,
+                          struct obd_trans_info *oti);
 
 /* filter_log.c */
-int filter_log_cancel(struct lustre_handle *, struct lov_stripe_md *,
+int filter_log_cancel(struct obd_export *, struct lov_stripe_md *,
                       int num_cookies, struct llog_cookie *, int flags);
 int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid,
                          obd_id oid, obd_count ogen, struct llog_cookie *);
@@ -147,4 +141,5 @@ int filter_san_setup(struct obd_device *obd, obd_count len, void *buf);
 int filter_san_preprw(int cmd, struct obd_export *, struct obdo *, int objcount,
                       struct obd_ioobj *, int niocount, struct niobuf_remote *);
 
+
 #endif
index 971cf1d..4240800 100644 (file)
@@ -43,8 +43,8 @@ static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
         int rc;
 
         page = grab_cache_page(mapping, index); /* locked page */
-        if (IS_ERR(page))
-                return lnb->rc = PTR_ERR(page);
+        if (page == NULL)
+                return lnb->rc = -ENOMEM;
 
         LASSERT(page->mapping == mapping);
 
@@ -99,138 +99,6 @@ err_page:
         return lnb->rc;
 }
 
-static struct page *lustre_get_page_write(struct inode *inode,
-                                          unsigned long index)
-{
-        struct address_space *mapping = inode->i_mapping;
-        struct page *page;
-        int rc;
-
-        page = grab_cache_page(mapping, index); /* locked page */
-
-        if (!IS_ERR(page)) {
-                /* Note: Called with "O" and "PAGE_SIZE" this is essentially
-                 * a no-op for most filesystems, because we write the whole
-                 * page.  For partial-page I/O this will read in the page.
-                 */
-                rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
-                if (rc) {
-                        CERROR("page index %lu, rc = %d\n", index, rc);
-                        if (rc != -ENOSPC)
-                                LBUG();
-                        GOTO(err_unlock, rc);
-                }
-                /* XXX not sure if we need this if we are overwriting page */
-                if (PageError(page)) {
-                        CERROR("error on page index %lu, rc = %d\n", index, rc);
-                        LBUG();
-                        GOTO(err_unlock, rc = -EIO);
-                }
-        }
-        return page;
-
-err_unlock:
-        unlock_page(page);
-        page_cache_release(page);
-        return ERR_PTR(rc);
-}
-
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-int wait_on_page_locked(struct page *page)
-{
-        waitfor_one_page(page);
-        return 0;
-}
-
-/* We should only change the file mtime (and not the ctime, like
- * update_inode_times() in generic_file_write()) when we only change data. */
-static inline void inode_update_time(struct inode *inode, int ctime_too)
-{
-        time_t now = CURRENT_TIME;
-        if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
-                return;
-        inode->i_mtime = now;
-        if (ctime_too)
-                inode->i_ctime = now;
-        mark_inode_dirty_sync(inode);
-}
-#endif
-
-static int lustre_commit_write(struct niobuf_local *lnb)
-{
-        struct page *page = lnb->page;
-        unsigned from = lnb->offset & ~PAGE_MASK;
-        unsigned to = from + lnb->len;
-        struct inode *inode = page->mapping->host;
-        int err;
-
-        LASSERT(to <= PAGE_SIZE);
-        err = page->mapping->a_ops->commit_write(NULL, page, from, to);
-#warning 2.4 folks: wait_on_page_locked does NOT return its error here.
-        if (!err && IS_SYNC(inode))
-                wait_on_page_locked(page);
-        //SetPageUptodate(page); // the client commit_write will do this
-
-        SetPageReferenced(page);
-        unlock_page(page);
-        page_cache_release(page);
-        return err;
-}
-
-int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
-                          int *pglocked)
-{
-        unsigned long index = lnb->offset >> PAGE_SHIFT;
-        struct address_space *mapping = inode->i_mapping;
-        struct page *page;
-        int rc;
-
-        //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
-        if (*pglocked)
-                page = grab_cache_page_nowait(mapping, index); /* locked page */
-        else
-                page = grab_cache_page(mapping, index); /* locked page */
-
-
-        /* This page is currently locked, so get a temporary page instead. */
-        if (page == NULL) {
-                CDEBUG(D_INFO, "ino %lu page %ld locked\n", inode->i_ino,index);
-                page = alloc_pages(GFP_KERNEL, 0); /* locked page */
-                if (page == NULL) {
-                        CERROR("no memory for a temp page\n");
-                        GOTO(err, rc = -ENOMEM);
-                }
-                page->index = index;
-                lnb->page = page;
-                lnb->flags |= N_LOCAL_TEMP_PAGE;
-        } else if (!IS_ERR(page)) {
-                unsigned from = lnb->offset & ~PAGE_MASK, to = from + lnb->len;
-                (*pglocked)++;
-
-                rc = mapping->a_ops->prepare_write(NULL, page, from, to);
-                if (rc) {
-                        if (rc != -ENOSPC)
-                                CERROR("page index %lu, rc = %d\n", index, rc);
-                        GOTO(err_unlock, rc);
-                }
-                /* XXX not sure if we need this if we are overwriting page */
-                if (PageError(page)) {
-                        CERROR("error on page index %lu, rc = %d\n", index, rc);
-                        LBUG();
-                        GOTO(err_unlock, rc = -EIO);
-                }
-                lnb->page = page;
-        }
-
-        return 0;
-
-err_unlock:
-        unlock_page(page);
-        page_cache_release(page);
-err:
-        return lnb->rc = rc;
-}
-
 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                               int objcount, struct obd_ioobj *obj,
                               int niocount, struct niobuf_remote *nb,
@@ -240,7 +108,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         struct obd_run_ctxt saved;
         struct obd_ioobj *o;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb;
+        struct niobuf_local *lnb = NULL;
         struct fsfilt_objinfo *fso;
         struct dentry *dentry;
         struct inode *inode;
@@ -260,7 +128,6 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
         push_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
         for (i = 0, o = obj; i < objcount; i++, o++) {
-                struct filter_dentry_data *fdd;
                 LASSERT(o->ioo_bufcnt);
 
                 dentry = filter_oa2dentry(exp->exp_obd, oa);
@@ -276,15 +143,13 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
                 fso[i].fso_dentry = dentry;
                 fso[i].fso_bufcnt = o->ioo_bufcnt;
-
-                fdd = dentry->d_fsdata;
-                if (fdd == NULL || !atomic_read(&fdd->fdd_open_count))
-                        CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
-                               o->ioo_id);
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
+                CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
+                       (jiffies - now));
 
         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
                 dentry = fso[i].fso_dentry;
@@ -325,7 +190,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+                CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
+                       (jiffies - now));
 
         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_READ_BYTES,
                             tot_bytes);
@@ -340,7 +208,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
+                CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
+                       (jiffies - now));
 
         EXIT;
 
@@ -355,7 +226,6 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                         f_dput(res->dentry);
                 else
                         CERROR("NULL dentry in cleanup -- tell CFS\n");
-                res->dentry = NULL;
         case 0:
                 OBD_FREE(fso, objcount * sizeof(*fso));
                 pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
@@ -363,41 +233,18 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         return rc;
 }
 
-/* We need to balance prepare_write() calls with commit_write() calls.
- * If the page has been prepared, but we have no data for it, we don't
- * want to overwrite valid data on disk, but we still need to zero out
- * data for space which was newly allocated.  Like part of what happens
- * in __block_prepare_write() for newly allocated blocks.
- *
- * XXX currently __block_prepare_write() creates buffers for all the
- *     pages, and the filesystems mark these buffers as BH_New if they
- *     were newly allocated from disk. We use the BH_New flag similarly. */
-static int filter_commit_write(struct niobuf_local *lnb, int err)
+static int filter_start_page_write(struct inode *inode,
+                                   struct niobuf_local *lnb)
 {
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        if (err) {
-                unsigned block_start, block_end;
-                struct buffer_head *bh, *head = lnb->page->buffers;
-                unsigned blocksize = head->b_size;
-
-                /* debugging: just seeing if this ever happens */
-                CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
-                       "called for ino %lu:%lu on err %d\n",
-                       lnb->page->mapping->host->i_ino, lnb->page->index, err);
-
-                /* Currently one buffer per page, but in the future... */
-                for (bh = head, block_start = 0; bh != head || !block_start;
-                     block_start = block_end, bh = bh->b_this_page) {
-                        block_end = block_start + blocksize;
-                        if (buffer_new(bh)) {
-                                memset(kmap(lnb->page) + block_start, 0,
-                                       blocksize);
-                                kunmap(lnb->page);
-                        }
-                }
+        struct page *page = alloc_pages(GFP_HIGHUSER, 0);
+        if (page == NULL) {
+                CERROR("no memory for a temp page\n");
+                RETURN(lnb->rc = -ENOMEM);
         }
-#endif
-        return lustre_commit_write(lnb);
+        page->index = lnb->offset >> PAGE_SHIFT;
+        lnb->page = page;
+
+        return 0;
 }
 
 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
@@ -417,124 +264,72 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                                struct obd_trans_info *oti)
 {
         struct obd_run_ctxt saved;
-        struct obd_ioobj *o;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb;
-        struct fsfilt_objinfo *fso;
+        struct niobuf_local *lnb = NULL;
+        struct fsfilt_objinfo fso;
         struct dentry *dentry;
-        int pglocked = 0, rc = 0, i, j, tot_bytes = 0;
+        int rc = 0, i, tot_bytes = 0;
         unsigned long now = jiffies;
         ENTRY;
         LASSERT(objcount == 1);
-
-        OBD_ALLOC(fso, objcount * sizeof(*fso));
-        if (fso == NULL)
-                RETURN(-ENOMEM);
+        LASSERT(obj->ioo_bufcnt > 0);
 
         memset(res, 0, niocount * sizeof(*res));
 
         push_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
-        for (i = 0, o = obj; i < objcount; i++, o++) {
-                struct filter_dentry_data *fdd;
-                LASSERT(o->ioo_bufcnt);
-
-                dentry = filter_oa2dentry(exp->exp_obd, oa);
-                if (IS_ERR(dentry))
-                        GOTO(out_objinfo, rc = PTR_ERR(dentry));
-
-                if (dentry->d_inode == NULL) {
-                        CERROR("trying to BRW to non-existent file "LPU64"\n",
-                               o->ioo_id);
-                        f_dput(dentry);
-                        GOTO(out_objinfo, rc = -ENOENT);
-                }
-
-                fso[i].fso_dentry = dentry;
-                fso[i].fso_bufcnt = o->ioo_bufcnt;
-
-                down(&dentry->d_inode->i_sem);
-                fdd = dentry->d_fsdata;
-                if (fdd == NULL || !atomic_read(&fdd->fdd_open_count))
-                        CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
-                               o->ioo_id);
+        dentry = filter_fid2dentry(exp->exp_obd, NULL, 0, obj->ioo_id);
+        if (IS_ERR(dentry))
+                GOTO(cleanup, rc = PTR_ERR(dentry));
+
+        if (dentry->d_inode == NULL) {
+                CERROR("trying to BRW to non-existent file "LPU64"\n",
+                       obj->ioo_id);
+                f_dput(dentry);
+                GOTO(cleanup, rc = -ENOENT);
         }
 
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
-
-        LASSERT(oti != NULL);
-        oti->oti_handle = fsfilt_brw_start(exp->exp_obd, objcount, fso,
-                                           niocount, oti);
-        if (IS_ERR(oti->oti_handle)) {
-                rc = PTR_ERR(oti->oti_handle);
-                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
-                       "error starting transaction: rc = %d\n", rc);
-                oti->oti_handle = NULL;
-                GOTO(out_objinfo, rc);
-        }
-
-        for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
-                dentry = fso[i].fso_dentry;
-                for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
-                        if (j == 0)
-                                lnb->dentry = dentry;
-                        else
-                                lnb->dentry = dget(dentry);
-
-                        lnb->offset = rnb->offset;
-                        lnb->len    = rnb->len;
-                        lnb->flags  = rnb->flags;
-                        lnb->start  = jiffies;
-
-                        rc = filter_get_page_write(dentry->d_inode, lnb,
-                                                   &pglocked);
-                        if (rc)
-                                up(&dentry->d_inode->i_sem);
+        fso.fso_dentry = dentry;
+        fso.fso_bufcnt = obj->ioo_bufcnt;
 
-                        if (rc) {
-                                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
-                                       "page err %u@"LPU64" %u/%u %p: rc %d\n",
-                                       lnb->len, lnb->offset, j, o->ioo_bufcnt,
-                                       dentry, rc);
-                                f_dput(dentry);
-                                GOTO(out_pages, rc);
-                        }
-                        tot_bytes += lnb->len;
+        if (time_after(jiffies, now + 15 * HZ))
+                CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
+                       (jiffies - now));
+
+        for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
+             i++, lnb++, rnb++) {
+                lnb->dentry = dentry;
+                lnb->offset = rnb->offset;
+                lnb->len    = rnb->len;
+                lnb->flags  = rnb->flags;
+                lnb->start  = jiffies;
+
+                rc = filter_start_page_write(dentry->d_inode, lnb);
+                if (rc) {
+                        CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
+                               LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
+                               i, obj->ioo_bufcnt, dentry, rc);
+                        while (lnb-- > res)
+                                __free_pages(lnb->page, 0);
+                        f_dput(dentry);
+                        GOTO(cleanup, rc);
                 }
+                tot_bytes += lnb->len;
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+                CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
+                       (jiffies - now));
 
         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
                             tot_bytes);
-
         EXIT;
-out:
-        OBD_FREE(fso, objcount * sizeof(*fso));
-        /* we saved the journal handle into oti->oti_handle instead */
-        current->journal_info = NULL;
+cleanup:
         pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
         return rc;
-
-out_pages:
-        while (lnb-- > res) {
-                filter_commit_write(lnb, rc);
-                up(&lnb->dentry->d_inode->i_sem);
-                f_dput(lnb->dentry);
-        }
-        filter_finish_transno(exp, oti, rc);
-        fsfilt_commit(exp->exp_obd,
-                      filter_parent(exp->exp_obd,S_IFREG,obj->ioo_id)->d_inode,
-                      oti->oti_handle, 0);
-        goto out; /* dropped the dentry refs already (one per page) */
-
-out_objinfo:
-        for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
-                up(&fso[i].fso_dentry->d_inode->i_sem);
-                f_dput(fso[i].fso_dentry);
-        }
-        goto out;
 }
 
 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
@@ -551,55 +346,9 @@ int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
                                           niocount, nb, res, oti);
 
         LBUG();
-
         return -EPROTO;
 }
 
-/* It is highly unlikely that we would ever get an error here.  The page we want
- * to get was previously locked, so it had to have already allocated the space,
- * and we were just writing over the same data, so there would be no hole in the
- * file.
- *
- * XXX: possibility of a race with truncate could exist, need to check that.
- *      There are no guarantees w.r.t. write order even on a local filesystem,
- *      although the normal response would be to return the number of bytes
- *      successfully written and leave the rest to the app. */
-static int filter_write_locked_page(struct niobuf_local *lnb)
-{
-        struct page *lpage;
-        void *lpage_addr, *lnb_addr;
-        int rc;
-        ENTRY;
-
-        lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
-        if (IS_ERR(lpage)) {
-                rc = PTR_ERR(lpage);
-                CERROR("error getting locked page index %ld: rc = %d\n",
-                       lnb->page->index, rc);
-                LBUG();
-                lustre_commit_write(lnb);
-                RETURN(rc);
-        }
-
-        /* 2 kmaps == vanishingly small deadlock opportunity */
-        lpage_addr = kmap(lpage);
-        lnb_addr = kmap(lnb->page);
-
-        memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
-
-        kunmap(lnb->page);
-        kunmap(lpage);
-
-        page_cache_release(lnb->page);
-
-        lnb->page = lpage;
-        rc = lustre_commit_write(lnb);
-        if (rc)
-                CERROR("error committing locked page %ld: rc = %d\n",
-                       lnb->page->index, rc);
-        RETURN(rc);
-}
-
 static int filter_commitrw_read(struct obd_export *exp, int objcount,
                                 struct obd_ioobj *obj, int niocount,
                                 struct niobuf_local *res,
@@ -621,144 +370,50 @@ static int filter_commitrw_read(struct obd_export *exp, int objcount,
         RETURN(0);
 }
 
-static int
-filter_commitrw_write(int cmd, struct obd_export *exp, struct obdo *oa,
-                      int objcount, struct obd_ioobj *obj, int niocount,
-                      struct niobuf_local *res, struct obd_trans_info *oti)
+void flip_into_page_cache(struct inode *inode, struct page *new_page)
 {
-        struct obd_run_ctxt saved;
-        struct obd_ioobj *o;
-        struct niobuf_local *lnb;
-        struct obd_device *obd = exp->exp_obd;
-        int found_locked = 0, rc = 0, i;
-        int nested_trans = current->journal_info != NULL;
-        unsigned long now = jiffies;  /* DEBUGGING OST TIMEOUTS */
+        struct page *old_page;
+        int rc;
         ENTRY;
 
-        push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-
-        if (cmd & OBD_BRW_WRITE) {
-                LASSERT(oti);
-                LASSERT(current->journal_info == NULL ||
-                        current->journal_info == oti->oti_handle);
-                current->journal_info = oti->oti_handle;
-        }
-
-        for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
-                struct inode *inode;
-                int j;
-
-                /* If all of the page reads were beyond EOF, let's pretend
-                 * this read didn't really happen at all. */
-                if (lnb->dentry == NULL) {
-                        oa->o_valid = OBD_MD_FLID|(oa->o_valid&OBD_MD_FLCKSUM);
-                        continue;
-                }
-
-                inode = igrab(lnb->dentry->d_inode);
-
-                if (cmd & OBD_BRW_WRITE) {
-                        /* FIXME: MULTI OBJECT BRW */
-                        if (oa && oa->o_valid & (OBD_MD_FLMTIME|OBD_MD_FLCTIME))
-                                obdo_refresh_inode(inode, oa, OBD_MD_FLATIME |
-                                                   OBD_MD_FLMTIME |
-                                                   OBD_MD_FLCTIME);
-                        else
-                                inode_update_time(lnb->dentry->d_inode, 1);
-                } else if (oa && oa->o_valid & OBD_MD_FLATIME) {
-                        /* Note that we don't necessarily write this to disk */
-                        obdo_refresh_inode(inode, oa, OBD_MD_FLATIME);
-                }
-
-                for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
-                        if (lnb->page == NULL) {
-                                continue;
-                        }
-
-                        if (lnb->flags & N_LOCAL_TEMP_PAGE) {
-                                found_locked++;
-                                continue;
-                        }
-
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commitrw %lus (%lus)\n",
-                                       (jiffies - lnb->start) / HZ,
-                                       (jiffies - now) / HZ);
-
-                        if (cmd & OBD_BRW_WRITE) {
-                                int err = filter_commit_write(lnb, 0);
-
-                                if (!rc)
-                                        rc = err;
-                        } else {
-                                page_cache_release(lnb->page);
-                        }
-
-                        f_dput(lnb->dentry);
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commit_write %lus (%lus)\n",
-                                       (jiffies - lnb->start) / HZ,
-                                       (jiffies - now) / HZ);
+        do {
+                /* the dlm is protecting us from read/write concurrency, so we
+                 * expect this find_lock_page to return quickly.  even if we
+                 * race with another writer it won't be doing much work with
+                 * the page locked.  we do this 'cause t_c_p expects a 
+                 * locked page, and it wants to grab the pagecache lock
+                 * as well. */
+                old_page = find_lock_page(inode->i_mapping, new_page->index);
+                if (old_page) {
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+                        truncate_complete_page(old_page);
+#else
+                        truncate_complete_page(old_page->mapping, old_page);
+#endif
+                        unlock_page(old_page);
+                        page_cache_release(old_page);
                 }
 
-                /* FIXME: MULTI OBJECT BRW */
-                if (oa) {
-                        oa->o_valid = OBD_MD_FLID|(oa->o_valid&OBD_MD_FLCKSUM);
-                        obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
+#if 0 /* this should be a /proc tunable someday */
+                /* racing o_directs (no locking ioctl) could race adding
+                 * their pages, so we repeat the page invalidation unless
+                 * we successfully added our new page */
+                rc = add_to_page_cache_unique(new_page, inode->i_mapping, 
+                                              new_page->index,
+                                              page_hash(inode->i_mapping, 
+                                                        new_page->index));
+                if (rc == 0) {
+                        /* add_to_page_cache clears uptodate|dirty and locks
+                         * the page */
+                        SetPageUptodate(new_page);
+                        unlock_page(new_page);
                 }
+#else   
+                rc = 0;
+#endif
+        } while (rc != 0);
 
-                if (cmd & OBD_BRW_WRITE)
-                        up(&inode->i_sem);
-
-                iput(inode);
-        }
-
-        for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
-             i++, o++) {
-                int j;
-
-                for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
-                        int err;
-                        if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
-                                continue;
-
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commitrw locked %lus (%lus)\n",
-                                       (jiffies - lnb->start) / HZ,
-                                       (jiffies - now) / HZ);
-
-                        err = filter_write_locked_page(lnb);
-                        if (!rc)
-                                rc = err;
-                        f_dput(lnb->dentry);
-                        found_locked--;
-
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commit_write locked %lus (%lus)\n",
-                                       (jiffies - lnb->start) / HZ,
-                                       (jiffies - now) / HZ);
-                }
-        }
-
-        if (cmd & OBD_BRW_WRITE) {
-                /* We just want any dentry for the commit, for now */
-                struct dentry *dparent = filter_parent(obd, S_IFREG, 0);
-                int err;
-
-                rc = filter_finish_transno(exp, oti, rc);
-                err = fsfilt_commit(obd, dparent->d_inode, oti->oti_handle,
-                                    obd_sync_filter);
-                if (err)
-                        rc = err;
-                if (obd_sync_filter)
-                        LASSERT(oti->oti_transno <= obd->obd_last_committed);
-                if (time_after(jiffies, now + 15 * HZ))
-                        CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ);
-        }
-
-        LASSERT(nested_trans || current->journal_info == NULL);
-        pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-        RETURN(rc);
+        EXIT;
 }
 
 /* XXX needs to trickle its oa down */
@@ -767,8 +422,8 @@ int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
                     struct niobuf_local *res, struct obd_trans_info *oti)
 {
         if (cmd == OBD_BRW_WRITE)
-                return filter_commitrw_write(cmd, exp, oa, objcount, obj,
-                                             niocount, res, oti);
+                return filter_commitrw_write(exp, objcount, obj, niocount,
+                                             res, oti);
         if (cmd == OBD_BRW_READ)
                 return filter_commitrw_read(exp, objcount, obj, niocount,
                                             res, oti);
@@ -776,11 +431,10 @@ int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
         return -EPROTO;
 }
 
-int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
+int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
                struct lov_stripe_md *lsm, obd_count oa_bufs,
                struct brw_page *pga, struct obd_trans_info *oti)
 {
-        struct obd_export *exp;
         struct obd_ioobj ioo;
         struct niobuf_local *lnb;
         struct niobuf_remote *rnb;
@@ -788,12 +442,6 @@ int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
         int ret = 0;
         ENTRY;
 
-        exp = class_conn2export(conn);
-        if (exp == NULL) {
-                CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
-                RETURN(-EINVAL);
-        }
-
         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
 
@@ -826,8 +474,8 @@ int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
                 else
                         memcpy(virt + off, addr + off, pga[i].count);
 
-                kunmap(addr);
-                kunmap(virt);
+                kunmap(lnb[i].page);
+                kunmap(pga[i].pg);
         }
 
         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti);
@@ -837,6 +485,5 @@ out:
                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
         if (rnb)
                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
-        class_export_put(exp);
         RETURN(ret);
 }
index 77eb078..d63847c 100644 (file)
@@ -57,7 +57,7 @@ static int filter_log_close(struct llog_handle *cathandle,
         if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){
                 CDEBUG(D_INODE, "deleting log file "LPX64":%x\n",
                        lgl->lgl_oid, lgl->lgl_ogen);
-                dparent = filter_parent_lock(loghandle->lgh_obd, S_IFREG,
+                dparent = filter_parent_lock(loghandle->lgh_obd, 1,
                                              lgl->lgl_oid,LCK_PW,&parent_lockh);
                 if (IS_ERR(dparent)) {
                         rc = PTR_ERR(dparent);
@@ -107,7 +107,7 @@ static struct llog_handle *filter_log_open(struct obd_device *obd,
         if (!loghandle)
                 RETURN(ERR_PTR(-ENOMEM));
 
-        dchild = filter_fid2dentry(obd, NULL, S_IFREG, lgl->lgl_oid);
+        dchild = filter_fid2dentry(obd, NULL, 1, lgl->lgl_oid);
         if (IS_ERR(dchild))
                 GOTO(out_handle, rc = PTR_ERR(dchild));
 
@@ -157,6 +157,7 @@ static struct llog_handle *filter_log_create(struct obd_device *obd)
         struct dentry *dparent, *dchild;
         struct llog_handle *loghandle;
         struct file *file;
+        struct obdo obdo;
         int err, rc;
         obd_id id;
         ENTRY;
@@ -165,14 +166,17 @@ static struct llog_handle *filter_log_create(struct obd_device *obd)
         if (!loghandle)
                 RETURN(ERR_PTR(-ENOMEM));
 
+        memset(&obdo, 0, sizeof(obdo));
+        obdo.o_valid = OBD_MD_FLGROUP;
+        obdo.o_gr = 1; /* FIXME: object groups */
  retry:
-        id = filter_next_id(filter);
+        id = filter_next_id(filter, &obdo);
 
-        dparent = filter_parent_lock(obd, S_IFREG, id, LCK_PW, &parent_lockh);
+        dparent = filter_parent_lock(obd, obdo.o_gr, id, LCK_PW, &parent_lockh);
         if (IS_ERR(dparent))
                 GOTO(out_ctxt, rc = PTR_ERR(dparent));
 
-        dchild = filter_fid2dentry(obd, dparent, S_IFREG, id);
+        dchild = filter_fid2dentry(obd, dparent, obdo.o_gr, id);
         if (IS_ERR(dchild))
                 GOTO(out_lock, rc = PTR_ERR(dchild));
 
@@ -192,8 +196,7 @@ static struct llog_handle *filter_log_create(struct obd_device *obd)
                 GOTO(out_child, rc);
         }
 
-        rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
-                                       filter->fo_fsd, 0);
+        rc = filter_update_last_objid(obd, obdo.o_gr, 0);
         if (rc) {
                 CERROR("can't write lastobjid but log created: rc %d\n",rc);
                 GOTO(out_destroy, rc);
@@ -309,11 +312,11 @@ void filter_put_catalog(struct llog_handle *cathandle)
         EXIT;
 }
 
-int filter_log_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
+int filter_log_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
                       int num_cookies, struct llog_cookie *logcookies,
                       int flags)
 {
-        struct obd_device *obd = class_conn2obd(conn);
+        struct obd_device *obd = exp->exp_obd;
         struct obd_run_ctxt saved;
         int rc;
         ENTRY;
index 750d16c..a2ef568 100644 (file)
@@ -574,7 +574,7 @@ __s32 portals_debug_copy_to_user(char *buf, unsigned long len);
 #endif
 void portals_debug_msg(int subsys, int mask, char *file, const char *fn,
                        const int line, unsigned long stack,
-                       const char *format, ...)
+                       char *format, ...)
         __attribute__ ((format (printf, 7, 8)));
 #else
 void portals_debug_msg(int subsys, int mask, char *file, const char *fn,
index 78a1e2d..57713cb 100644 (file)
@@ -16,6 +16,8 @@ struct list_head {
        struct list_head *next, *prev;
 };
 
+typedef struct list_head list_t;
+
 #define LIST_HEAD_INIT(name) { &(name), &(name) }
 
 #define LIST_HEAD(name) \
index e9b0e12..16ef401 100644 (file)
@@ -636,7 +636,7 @@ int portals_debug_mark_buffer(char *text)
                 return -EINVAL;
 
         CDEBUG(0, "********************************************************\n");
-        CDEBUG(0, "DEBUG MARKER: %s\n", text);
+        CERROR("DEBUG MARKER: %s\n", text);
         CDEBUG(0, "********************************************************\n");
 
         return 0;
@@ -644,39 +644,104 @@ int portals_debug_mark_buffer(char *text)
 #undef DEBUG_SUBSYSTEM
 #define DEBUG_SUBSYSTEM S_PORTALS
 
+/* this copies a snapshot of the debug buffer into an array of pages
+ * before doing the potentially blocking copy into userspace. it could
+ * be warning userspace if things wrap heavily while its off copying. */
 __s32 portals_debug_copy_to_user(char *buf, unsigned long len)
 {
         int rc;
-        unsigned long debug_off;
+        unsigned long debug_off, i, off, copied;
         unsigned long flags;
+        struct page *page;
+        LIST_HEAD(my_pages);
+        struct list_head *pos, *n;
 
         if (len < debug_size)
                 return -ENOSPC;
 
-        debug_off = atomic_read(&debug_off_a);
+        for (i = 0 ; i < debug_size; i += PAGE_SIZE) {
+                page = alloc_page(GFP_NOFS);
+                if (page == NULL) {
+                        rc = -ENOMEM;
+                        goto cleanup;
+                }
+                list_add(&page->list, &my_pages);
+        }
+        
         spin_lock_irqsave(&portals_debug_lock, flags);
-        if (debug_wrapped) {
-                /* All of this juggling with the 1s is to keep the trailing nul
-                 * (which falls at debug_buf + debug_off) at the end of what we
-                 * copy into user space */
-                copy_to_user(buf, debug_buf + debug_off + 1,
-                             debug_size - debug_off - 1);
-                copy_to_user(buf + debug_size - debug_off - 1,
-                             debug_buf, debug_off + 1);
-                rc = debug_size;
-        } else {
-                copy_to_user(buf, debug_buf, debug_off);
-                rc = debug_off;
+        debug_off = atomic_read(&debug_off_a);
+        
+        /* Sigh. If the buffer is empty, then skip to the end. */
+        if (debug_off == 0 && !debug_wrapped) {
+                spin_unlock_irqrestore(&portals_debug_lock, flags);
+                rc = 0;
+                goto cleanup;
         }
+
+        if (debug_wrapped)
+                off = debug_off + 1;
+        else 
+                off = 0;
+        copied = 0;
+        list_for_each(pos, &my_pages) {
+                unsigned long to_copy;
+                page = list_entry(pos, struct page, list);
+
+                to_copy = min(debug_size - off, PAGE_SIZE);
+                if (to_copy == 0) {
+                        off = 0;
+                        to_copy = min(debug_size - off, PAGE_SIZE);
+                }
+finish_partial:
+                memcpy(kmap(page), debug_buf + off, to_copy);
+                kunmap(page);
+                copied += to_copy;
+                if (copied >= (debug_wrapped ? debug_size : debug_off))
+                        break;
+                        
+                off += to_copy;
+                if (off >= debug_size) {
+                        off = 0;
+                        if (to_copy != PAGE_SIZE) {
+                                to_copy = PAGE_SIZE - to_copy;
+                                goto finish_partial;
+                        }
+                }
+        }
+
         spin_unlock_irqrestore(&portals_debug_lock, flags);
 
+        off = 0;
+        list_for_each(pos, &my_pages) {
+                unsigned long to_copy;
+                page = list_entry(pos, struct page, list);
+
+                to_copy = min(copied - off, PAGE_SIZE);
+                rc = copy_to_user(buf + off, kmap(page), to_copy);
+                kunmap(page);
+                if (rc) {
+                        rc = -EFAULT;
+                        goto cleanup;
+                }
+                off += to_copy;
+                if (off >= copied)
+                        break;
+        }
+        rc = copied;
+        
+cleanup:
+        list_for_each_safe(pos, n, &my_pages) {
+                page = list_entry(pos, struct page, list);
+                list_del(&page->list);
+                __free_page(page);
+        }
         return rc;
 }
 
 /* FIXME: I'm not very smart; someone smarter should make this better. */
 void
 portals_debug_msg(int subsys, int mask, char *file, const char *fn,
-                  const int line, unsigned long stack, const char *format, ...)
+                  const int line, unsigned long stack, char *format, ...)
 {
         va_list       ap;
         unsigned long flags;
@@ -731,33 +796,34 @@ portals_debug_msg(int subsys, int mask, char *file, const char *fn,
         do_gettimeofday(&tv);
 
         prefix_nob = snprintf(debug_buf + debug_off, max_nob,
-                              "%06x:%06x:%d:%lu.%06lu ",
+                              "%06x:%06x:%d:%lu.%06lu :",
                               subsys, mask, smp_processor_id(),
                               tv.tv_sec, tv.tv_usec);
         max_nob -= prefix_nob;
+        if(*(format + strlen(format) - 1) == '\n')
+                *(format + strlen(format) - 1) = ':';
+           
+        va_start(ap, format);
+        msg_nob = vsnprintf(debug_buf + debug_off + prefix_nob ,
+                            max_nob, format, ap);
+        max_nob -= msg_nob;
+        va_end(ap);
 
 #if defined(__arch_um__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,4,20))
-        msg_nob = snprintf(debug_buf + debug_off + prefix_nob, max_nob,
-                           "(%s:%d:%s() %d | %d+%lu)",
+        msg_nob += snprintf(debug_buf + debug_off + prefix_nob + msg_nob, max_nob,
+                           "(%s:%d:%s() %d | %d+%lu)\n",
                            file, line, fn, current->pid,
                            current->thread.extern_pid, stack);
 #elif defined(__arch_um__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        msg_nob = snprintf(debug_buf + debug_off + prefix_nob, max_nob,
-                           "(%s:%d:%s() %d | %d+%lu)",
+        msg_nob += snprintf(debug_buf + debug_off + prefix_nob + msg_nob, max_nob,
+                           "(%s:%d:%s() %d | %d+%lu)\n",
                            file, line, fn, current->pid,
                            current->thread.mode.tt.extern_pid, stack);
 #else
-        msg_nob = snprintf(debug_buf + debug_off + prefix_nob, max_nob,
-                           "(%s:%d:%s() %d+%lu)",
+        msg_nob += snprintf(debug_buf + debug_off + prefix_nob + msg_nob, max_nob,
+                           "(%s:%d:%s() %d+%lu)\n",
                            file, line, fn, current->pid, stack);
 #endif
-        max_nob -= msg_nob;
-
-        va_start(ap, format);
-        msg_nob += vsnprintf(debug_buf + debug_off + prefix_nob + msg_nob,
-                             max_nob, format, ap);
-        max_nob -= msg_nob;
-        va_end(ap);
 
         /* Print to console, while msg is contiguous in debug_buf */
         /* NB safely terminated see above */
index 0a009d2..5921259 100644 (file)
@@ -56,6 +56,8 @@ static int max = 8192;
 static int subsystem_mask = ~0;
 static int debug_mask = ~0;
 
+#define MAX_MARK_SIZE 100
+
 static const char *portal_debug_subsystems[] =
         {"undefined", "mdc", "mds", "osc", "ost", "class", "log", "llite",
          "rpc", "mgmt", "portals", "socknal", "qswnal", "pinger", "filter",
@@ -480,22 +482,29 @@ int jt_dbg_clear_debug_buf(int argc, char **argv)
 
 int jt_dbg_mark_debug_buf(int argc, char **argv)
 {
-        int rc;
+        int rc, max_size = MAX_MARK_SIZE-1;
         struct portal_ioctl_data data;
         char *text;
         time_t now = time(NULL);
 
-        if (argc > 2) {
-                fprintf(stderr, "usage: %s [marker text]\n", argv[0]);
-                return 0;
-        }
-
-        if (argc == 2) {
-                text = argv[1];
+        if (argc > 1) {
+                int counter;
+                text = malloc(MAX_MARK_SIZE);
+                strncpy(text, argv[1], max_size);
+                max_size-=strlen(argv[1]);
+                for(counter = 2; (counter < argc) && (max_size > 0) ; counter++){
+                        strncat(text, " ", 1);
+                        max_size-=1;
+                        strncat(text, argv[counter], max_size);
+                        max_size-=strlen(argv[counter]);
+                }
         } else {
                 text = ctime(&now);
                 text[strlen(text) - 1] = '\0'; /* stupid \n */
         }
+        if (!max_size) {
+                text[MAX_MARK_SIZE - 1] = '\0';
+        }
 
         memset(&data, 0, sizeof(data));
         data.ioc_inllen1 = strlen(text) + 1;
index 77d6fc3..19481fa 100644 (file)
@@ -306,6 +306,7 @@ int ptlrpc_stop_pinger(void)
                      (pinger_thread->t_flags & SVC_STOPPED), &lwi);
 
         OBD_FREE(pinger_thread, sizeof(*pinger_thread));
+        pinger_thread = NULL;
 
  out:
         up(&pinger_sem);
index f45f352..8ae2c6e 100644 (file)
@@ -94,7 +94,17 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         init_MUTEX(&cli->cl_dirty_sem);
         cli->cl_dirty = 0;
         cli->cl_dirty_granted = 0;
+        cli->cl_dirty_max = 64*1024*1024; /* some default */
         cli->cl_ost_can_grant = 1;
+        INIT_LIST_HEAD(&cli->cl_cache_waiters);
+        init_waitqueue_head(&cli->cl_cache_waitq);
+        INIT_LIST_HEAD(&cli->cl_loi_ready_list);
+        spin_lock_init(&cli->cl_loi_list_lock);
+        cli->cl_brw_in_flight = 0;
+        spin_lock_init(&cli->cl_rpc_concurrency_oh.oh_lock);
+        spin_lock_init(&cli->cl_pages_per_rpc_oh.oh_lock);
+        cli->cl_max_pages_per_rpc = PTL_MD_MAX_IOV;
+        cli->cl_max_rpcs_in_flight = 8;
 
         conn = ptlrpc_uuid_to_connection(&server_uuid);
         if (conn == NULL)
index d02c3e3..7c7b771 100755 (executable)
@@ -4,6 +4,7 @@
 #include <errno.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <sys/mman.h>
 #include <signal.h>
 #include <stdlib.h>
 #include <unistd.h>
@@ -18,9 +19,12 @@ char usage[] =
 "        o  open(O_RDONLY)\n"
 "        O  open(O_CREAT|O_RDWR)\n"
 "        u  unlink\n"
+"        U  munmap\n"
 "        m  mknod\n"
+"        M  mmap to EOF (must open and stat prior)\n"
 "        c  close\n"
 "        _  wait for signal\n"
+"        R  reference entire mmap-ed region\n"
 "        r  read\n"
 "        s  stat\n"
 "        S  fstat\n"
@@ -35,6 +39,8 @@ int main(int argc, char **argv)
 {
         char *fname, *commands;
         struct stat st;
+       size_t mmap_len, i;
+       unsigned char *mmap_ptr = NULL, junk = 0;
         int fd = -1;
 
         if (argc != 3) {
@@ -64,6 +70,15 @@ int main(int argc, char **argv)
                                 exit(1);
                         }
                         break;
+               case 'M':
+                       mmap_len = st.st_size;
+                       mmap_ptr = mmap(NULL, mmap_len, PROT_READ, MAP_SHARED, 
+                                       fd, 0);
+                       if (mmap_ptr == MAP_FAILED) {
+                               perror("mmap");
+                               exit(1);
+                       }
+                       break;
                 case 'O':
                         fd = open(fname, O_CREAT|O_RDWR, 0644);
                         if (fd == -1) {
@@ -91,6 +106,10 @@ int main(int argc, char **argv)
                                 exit(1);
                         }
                         break;
+               case 'R':
+                       for (i = 0; i < mmap_len && mmap_ptr; i += 4096)
+                               junk += mmap_ptr[i];
+                       break;
                 case 's':
                         if (stat(fname, &st) == -1) {
                                 perror("stat");
@@ -115,6 +134,12 @@ int main(int argc, char **argv)
                                 exit(1);
                         }
                         break;
+               case 'U':
+                       if (munmap(mmap_ptr, mmap_len)) {
+                               perror("munmap");
+                               exit(1);
+                       }
+                       break;
                case 'w':
                        if (write(fd, "w", 1) == -1) {
                                perror("write");
index 3d618e9..2368f0e 100755 (executable)
@@ -3,67 +3,24 @@
 set -e
 
 LUSTRE=${LUSTRE:-`dirname $0`/..}
-LTESTDIR=${LTESTDIR:-$LUSTRE/../ltest}
-PATH=$PATH:$LUSTRE/utils:$LUSTRE/tests
+. $LUSTRE/tests/test-framework.sh
 
-RLUSTRE=${RLUSTRE:-$LUSTRE}
-RPWD=${RPWD:-$PWD}
-
-. $LTESTDIR/functional/llite/common/common.sh
+init_test_env
 
 # XXX I wish all this stuff was in some default-config.sh somewhere
-MOUNT=${MOUNT:-/mnt/lustre}
 MDSDEV=${MDSDEV:-/tmp/mds-`hostname`}
 MDSSIZE=${MDSSIZE:-100000}
 OSTDEV=${OSTDEV:-/tmp/ost-`hostname`}
 OSTSIZE=${OSTSIZE:-100000}
-MOUNT=${MOUNT:-/mnt/lustre}
 MOUNT1=${MOUNT1:-${MOUNT}1}
 MOUNT2=${MOUNT2:-${MOUNT}2}
+MOUNT=${MOUNT1}
 UPCALL=${UPCALL:-$PWD/replay-single-upcall.sh}
 FSTYPE=${FSTYPE:-ext3}
 TIMEOUT=${TIMEOUT:-5}
 
-start() {
-    facet=$1
-    shift
-    lconf --node ${facet}_facet $@ replay-dual.xml
-}
-
-stop() {
-    facet=$1
-    shift
-    lconf --node ${facet}_facet $@ --cleanup replay-dual.xml
-}
-
-replay_barrier() {
-    local dev=$1
-    sync
-    lctl --device %${dev}1 readonly
-    lctl --device %${dev}1 notransno
-    lctl mark "REPLAY BARRIER"
-}
-
-fail() {
-    local facet=$1
-    lctl mark "FAIL $facet"
-    stop $facet --force --failover --nomod
-    start $facet --nomod
-    lctl mark "RECOVER $facet"
-    df $MOUNT1 | tail -1
-    df $MOUNT2 | tail -1
-}
-
-do_lmc() {
-    lmc -m replay-dual.xml $@
-}
-
-add_facet() {
-    local facet=$1
-    shift
-    do_lmc --add node --node ${facet}_facet $@ --timeout $TIMEOUT
-    do_lmc --add net --node ${facet}_facet --nid localhost --nettype tcp
-}
+STRIPE_BYTES=65536
+STRIPES_PER_OBJ=1
 
 gen_config() {
     rm -f replay-dual.xml
@@ -72,80 +29,11 @@ gen_config() {
     add_facet client1 --lustre_upcall $UPCALL
     add_facet client2 --lustre_upcall $UPCALL
     do_lmc --add mds --node mds_facet --mds mds1 --dev $MDSDEV --size $MDSSIZE
-    do_lmc --add ost --node ost_facet --ost ost1 --dev $OSTDEV --size $OSTSIZE
+    do_lmc --add ost --lov lov1 --node ost_facet --ost ost1 --dev $OSTDEV --size $OSTSIZE
     do_lmc --add mtpt --node client1_facet --path $MOUNT1 --mds mds1 --ost ost1
     do_lmc --add mtpt --node client2_facet --path $MOUNT2 --mds mds1 --ost ost1
 }
-error() {
-    echo '**** FAIL:' $@
-    exit 1
-}
 
-build_test_filter() {
-        for O in $ONLY; do
-            eval ONLY_${O}=true
-        done
-        for E in $EXCEPT $ALWAYS_EXCEPT; do
-            eval EXCEPT_${E}=true
-        done
-}
-
-_basetest() {
-    echo $*
-}
-
-basetest() {
-    IFS=abcdefghijklmnopqrstuvwxyz _basetest $1
-}
-
-run_test() {
-        base=`basetest $1`
-        if [ ! -z "$ONLY" ]; then
-                 testname=ONLY_$1
-                 if [ ${!testname}x != x ]; then
-                     run_one $1 "$2"
-                     return $?
-                 fi
-                 testname=ONLY_$base
-                 if [ ${!testname}x != x ]; then
-                     run_one $1 "$2"
-                     return $?
-                 fi
-                 echo -n "."
-                 return 0
-        fi
-        testname=EXCEPT_$1
-        if [ ${!testname}x != x ]; then
-                 echo "skipping excluded test $1"
-                 return 0
-        fi
-        testname=EXCEPT_$base
-        if [ ${!testname}x != x ]; then
-                 echo "skipping excluded test $1 (base $base)"
-                 return 0
-        fi
-        run_one $1 "$2"
-
-        return $?
-}
-
-EQUALS="======================================================================"
-equals_msg() {
-   msg="$@"
-
-   local suffixlen=$((65 - ${#msg}))
-   printf '===== %s %.*s\n' "$msg" $suffixlen $EQUALS
-}
-
-run_one() {
-    testnum=$1
-    message=$2
-    
-    # Pretty tests run faster.
-    equals_msg $testnum: $message
-
-    test_${testnum} || error "test_$testnum failed with $?"
-}
 
 build_test_filter
 
@@ -156,7 +44,7 @@ PINGER=`cat /proc/fs/lustre/pinger`
 if [ "$PINGER" != "on" ]; then
     echo "ERROR: Lustre must be built with --enable-pinger for replay-dual"
     stop mds
-    exit
+    exit 1
 fi
 
 start ost --reformat
@@ -270,7 +158,7 @@ run_test 6 "open1, open2, unlink |X| close1 [fail mds] close2"
 
 
 equals_msg test complete, cleaning up
-stop client2 --nomod
-stop client1
-stop ost
-stop mds --dump cleanup-dual.log
+stop client2 ${FORCE:=--force} --nomod
+stop client1 ${FORCE}
+stop ost ${FORCE}
+stop mds ${FORCE} --dump cleanup-dual.log
index c55bd7e..b8fffe8 100755 (executable)
@@ -2,21 +2,15 @@
 
 set -e
 
-# attempt to print a useful error location, but the ERR trap isn't
-# exported to functions, and the $LINENO doesn't work in EXIT.
-
-trap 'echo ERROR $0:$FUNCNAME:$LINENO: rc: $?' EXIT
-
 LUSTRE=${LUSTRE:-`dirname $0`/..}
-LTESTDIR=${LTESTDIR:-$LUSTRE/../ltest}
-PATH=$PATH:$LUSTRE/utils:$LUSTRE/tests
+. $LUSTRE/tests/test-framework.sh
 
-RLUSTRE=${RLUSTRE:-$LUSTRE}
-RPWD=${RPWD:-$PWD}
+init_test_env
 
-. $LTESTDIR/functional/llite/common/common.sh
+# Skip these tests
+# 3 - bug 1852
+ALWAYS_EXCEPT="3"
 
-CHECKSTAT="${CHECKSTAT:-checkstat} -v"
 
 # XXX I wish all this stuff was in some default-config.sh somewhere
 MOUNT=${MOUNT:-/mnt/lustre}
@@ -29,43 +23,9 @@ UPCALL=${UPCALL:-$PWD/replay-single-upcall.sh}
 FSTYPE=${FSTYPE:-ext3}
 TIMEOUT=${TIMEOUT:-5}
 
-start() {
-    facet=$1
-    shift
-    lconf --node ${facet}_facet $@ replay-single.xml
-}
-
-stop() {
-    facet=$1
-    shift
-    lconf --node ${facet}_facet $@ --cleanup replay-single.xml
-}
-
-replay_barrier() {
-    local dev=$1
-    sync
-    lctl --device %${dev}1 readonly
-    lctl --device %${dev}1 notransno
-    lctl mark "REPLAY BARRIER"
-}
-
-fail() {
-    local facet=$1
-    stop $facet --force --failover --nomod
-    start $facet --nomod
-    df $MOUNT
-}
+STRIPE_BYTES=65536
+STRIPES_PER_OBJ=1
 
-do_lmc() {
-    lmc -m replay-single.xml $@
-}
-
-add_facet() {
-    local facet=$1
-    shift
-    do_lmc --add node --node ${facet}_facet $@ --timeout $TIMEOUT
-    do_lmc --add net --node ${facet}_facet --nid localhost --nettype tcp
-}
 
 gen_config() {
     rm -f replay-single.xml
@@ -73,146 +33,141 @@ gen_config() {
     add_facet ost
     add_facet client --lustre_upcall $UPCALL
     do_lmc --add mds --node mds_facet --mds mds1 --dev $MDSDEV --size $MDSSIZE
-    do_lmc --add ost --node ost_facet --ost ost1 --dev $OSTDEV --size $OSTSIZE
-    do_lmc --add mtpt --node client_facet --path $MOUNT --mds mds1 --ost ost1
-}
-
-error() {
-    echo '**** FAIL:' $@
-    exit 1
+    do_lmc --add lov --mds mds1 --lov lov1 --stripe_sz $STRIPE_BYTES --stripe_cnt $STRIPES_PER_OBJ --stripe_pattern 0
+    do_lmc --add ost --lov lov1 --node ost_facet --ost ost1 --dev $OSTDEV --size $OSTSIZE
+    do_lmc --add ost --lov lov1 --node ost_facet --ost ost2 --dev ${OSTDEV}-2 --size $OSTSIZE
+    do_lmc --add mtpt --node client_facet --path $MOUNT --mds mds1 --ost lov1
 }
 
-build_test_filter() {
-        for O in $ONLY; do
-            eval ONLY_${O}=true
-        done
-        for E in $EXCEPT $ALWAYS_EXCEPT; do
-            eval EXCEPT_${E}=true
-        done
-}
-
-_basetest() {
-    echo $*
-}
-
-basetest() {
-    IFS=abcdefghijklmnopqrstuvwxyz _basetest $1
-}
-
-run_test() {
-        base=`basetest $1`
-        if [ ! -z "$ONLY" ]; then
-                 testname=ONLY_$1
-                 if [ ${!testname}x != x ]; then
-                     run_one $1 "$2"
-                     return $?
-                 fi
-                 testname=ONLY_$base
-                 if [ ${!testname}x != x ]; then
-                     run_one $1 "$2"
-                     return $?
-                 fi
-                 echo -n "."
-                 return 0
-        fi
-        testname=EXCEPT_$1
-        if [ ${!testname}x != x ]; then
-                 echo "skipping excluded test $1"
-                 return 0
-        fi
-        testname=EXCEPT_$base
-        if [ ${!testname}x != x ]; then
-                 echo "skipping excluded test $1 (base $base)"
-                 return 0
-        fi
-        run_one $1 "$2"
-
-        return $?
-}
-
-EQUALS="======================================================================"
-
-run_one() {
-    testnum=$1
-    message=$2
-
-    # Pretty tests run faster.
-    echo -n '=====' $testnum: $message
-    local suffixlen=`echo -n $2 | awk '{print 65 - length($0)}'`
-    printf ' %.*s\n' $suffixlen $EQUALS
-
-    test_${testnum} || error "test_$testnum failed with $?"
-}
 
 build_test_filter
 
 gen_config
 start mds --reformat $MDSLCONFARGS
 start ost --reformat $OSTLCONFARGS
-start client $CLIENTLCONFARGS
+start client --gdb $CLIENTLCONFARGS
 
 mkdir -p $DIR
 
-test_1() {
+test_0() {
     replay_barrier mds
-    mcreate $DIR/f1
     fail mds
-    $CHECKSTAT -t file $DIR/f1 || error 
-    rm $DIR/f1
 }
-run_test 1 "simple create"
+run_test 0 "empty replay"
 
-test_1a() {
+test_1() {
     replay_barrier mds
-    touch $DIR/f1
+    mcreate $DIR/$tfile
     fail mds
-    $CHECKSTAT -t file $DIR/f1 || error 
-    rm $DIR/f1
+    $CHECKSTAT -t file $DIR/$tfile || return 1
+    rm $DIR/$tfile
 }
-run_test 1 "touch"
+run_test 1 "simple create"
 
 test_2() {
     replay_barrier mds
-    mkdir $DIR/d2
-    mcreate $DIR/d2/f2
+    touch $DIR/$tfile
     fail mds
-    $CHECKSTAT -t dir $DIR/d2 || error 
-    $CHECKSTAT -t file $DIR/d2/f2 || error 
-    rm -fr $DIR/d2
+    $CHECKSTAT -t file $DIR/$tfile || return 1
 }
-run_test 2 "mkdir + contained create"
+run_test 2 "touch"
 
+# bug 1852
 test_3() {
-    mkdir $DIR/d3
     replay_barrier mds
-    mcreate $DIR/d3/f3
+    mcreate $DIR/$tfile
+    o_directory $DIR/$tfile
+    rm -f $DIR/$tfile
     fail mds
-    $CHECKSTAT -t dir $DIR/d3 || error 
-    $CHECKSTAT -t file $DIR/d3/f3 || error 
-    rm -fr $DIR/d3
+    $CHECKSTAT -t file $DIR/$tfile && return 2
 }
-run_test 3 "mkdir |X| contained create"
+run_test 3 "replay failed open"
 
 test_4() {
     replay_barrier mds
-    multiop $DIR/f4 mo_c &
+    for i in `seq 10`; do
+        echo "tag-$i" > $DIR/$tfile-$i
+    done 
+    fail mds
+    for i in `seq 10`; do
+      grep -q "tag-$i" $DIR/$tfile-$i || error "f1c-$i"
+    done 
+}
+run_test 4 "|x| 10 open(O_CREAT)s"
+
+test_4b() {
+    replay_barrier mds
+    rm -rf $DIR/$tfile-*
+    fail mds
+    $CHECKSTAT -t file $DIR/$tfile-* && return 1 || true
+}
+run_test 4b "|x| rm 10 files"
+
+# The idea is to get past the first block of precreated files on both 
+# osts, and then replay.
+test_5() {
+    replay_barrier mds
+    for i in `seq 220`; do
+        echo "tag-$i" > $DIR/$tfile-$i
+    done 
+    fail mds
+    for i in `seq 220`; do
+      grep -q "tag-$i" $DIR/$tfile-$i || error "f1c-$i"
+    done 
+    rm -rf $DIR/$tfile-*
+}
+run_test 5 "|x| 220 open(O_CREAT)"
+
+
+test_6() {
+    replay_barrier mds
+    mkdir $DIR/$tdir
+    mcreate $DIR/$tdir/$tfile
+    fail mds
+    $CHECKSTAT -t dir $DIR/$tdir || return 1
+    $CHECKSTAT -t file $DIR/$tdir/$tfile || return 2
+}
+run_test 6 "mkdir + contained create"
+
+test_6b() {
+    replay_barrier mds
+    rm -rf $DIR/$tdir
+    fail mds
+    $CHECKSTAT -t dir $DIR/$tdir && return 1 || true 
+}
+run_test 6b "|X| rmdir"
+
+test_7() {
+    mkdir $DIR/$tdir
+    replay_barrier mds
+    mcreate $DIR/$tdir/$tfile
+    fail mds
+    $CHECKSTAT -t dir $DIR/$tdir || return 1
+    $CHECKSTAT -t file $DIR/$tdir/$tfile || return 2
+    rm -fr $DIR/$tdir
+}
+run_test 7 "mkdir |X| contained create"
+
+test_8() {
+    replay_barrier mds
+    multiop $DIR/$tfile mo_c &
     MULTIPID=$!
     sleep 1
     fail mds
-    ls $DIR/f4
-    $CHECKSTAT -t file $DIR/f4 || error 
-    kill -USR1 $MULTIPID
-    wait
-    rm $DIR/f4
+    ls $DIR/$tfile
+    $CHECKSTAT -t file $DIR/$tfile || return 1
+    kill -USR1 $MULTIPID || return 2
+    wait $MULTIPID || return 3
+    rm $DIR/$tfile
 }
-run_test 4 "open |X| close"
+run_test 8 "creat open |X| close"
 
-test_5() {
+test_9() {
     replay_barrier mds
-    mcreate $DIR/f5
-    local old_inum=`ls -i $DIR/f5 | awk '{print $1}'`
+    mcreate $DIR/$tfile
+    local old_inum=`ls -i $DIR/$tfile | awk '{print $1}'`
     fail mds
-    local new_inum=`ls -i $DIR/f5 | awk '{print $1}'`
+    local new_inum=`ls -i $DIR/$tfile | awk '{print $1}'`
 
     echo " old_inum == $old_inum, new_inum == $new_inum"
     if [ $old_inum -eq $new_inum  ] ;
@@ -220,80 +175,174 @@ test_5() {
         echo " old_inum and new_inum match"
     else
         echo "!!!! old_inum and new_inum NOT match"
-
+        return 1
     fi
-    rm -f $DIR/f5
+    rm $DIR/$tfile
 }
-run_test 5 "|X| create (same inum/gen)"
+run_test  "|X| create (same inum/gen)"
 
-test_6() {
-    mcreate $DIR/f6
+test_10() {
+    mcreate $DIR/$tfile
     replay_barrier mds
-    mv $DIR/f6 $DIR/F6
-    rm -f $DIR/F6
+    mv $DIR/$tfile $DIR/$tfile-2
+    rm -f $DIR/$tfile
     fail mds
-    $CHECKSTAT $DIR/f6 && return 1
-    $CHECKSTAT $DIR/F6 && return 2
+    $CHECKSTAT $DIR/$tfile && return 1
+    $CHECKSTAT $DIR/$tfile-2 ||return 2
+    rm $DIR/$tfile-2
     return 0
 }
+run_test 10 "create |X| rename unlink"
 
-run_test 6 "create |X| rename unlink"
-
-test_7() {
-    mcreate $DIR/f7
-    echo "old" > $DIR/f7
-    mv $DIR/f7 $DIR/F7
+test_11() {
+    mcreate $DIR/$tfile
+    echo "old" > $DIR/$tfile
+    mv $DIR/$tfile $DIR/$tfile-2
     replay_barrier mds
-    mcreate $DIR/f7
-    echo "new" > $DIR/f7
-    cat $DIR/f7 | grep new 
-    cat $DIR/F7 | grep old
+    echo "new" > $DIR/$tfile
+    grep new $DIR/$tfile 
+    grep old $DIR/$tfile-2
     fail mds
-    cat $DIR/f7 | grep new
-    cat $DIR/F7 | grep old
+    grep new $DIR/$tfile || return 1
+    grep old $DIR/$tfile-2 || return 2
 }
-run_test 7 "create open write rename |X| create-old-name read"
+run_test 11 "create open write rename |X| create-old-name read"
 
-test_8() {
-    mcreate $DIR/f8 
-    multiop $DIR/f8 o_tSc &
+test_12() {
+    mcreate $DIR/$tfile 
+    multiop $DIR/$tfile o_tSc &
     pid=$!
     # give multiop a chance to open
     sleep 1 
-    rm -f $DIR/f8
+    rm -f $DIR/$tfile
     replay_barrier mds
     kill -USR1 $pid
     wait $pid || return 1
 
     fail mds
-    [ -e $DIR/f8 ] && return 2
+    [ -e $DIR/$tfile ] && return 2
     return 0
 }
-run_test 8 "open, unlink |X| close"
+run_test 12 "open, unlink |X| close"
+
 
 # 1777 - replay open after committed chmod that would make
 #        a regular open a failure    
-test_9() {
-    mcreate $DIR/f9 
-    multiop $DIR/f9 O_wc &
+test_13() {
+    mcreate $DIR/$tfile 
+    multiop $DIR/$tfile O_wc &
     pid=$!
     # give multiop a chance to open
     sleep 1 
-    chmod 0 $DIR/f9
-    $CHECKSTAT -p 0 $DIR/f9
+    chmod 0 $DIR/$tfile
+    $CHECKSTAT -p 0 $DIR/$tfile
     replay_barrier mds
     fail mds
     kill -USR1 $pid
     wait $pid || return 1
 
-    $CHECKSTAT -s 1 $DIR/f9
+    $CHECKSTAT -s 1 -p 0 $DIR/$tfile || return 2
+    return 0
+}
+run_test 13 "open chmod 0 |x| write close"
+
+test_14() {
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    # give multiop a chance to open
+    sleep 1 
+    rm -f $DIR/$tfile
+    replay_barrier mds
+    kill -USR1 $pid || return 1
+    wait $pid || return 2
+
+    fail mds
+    [ -e $DIR/$tfile ] && return 3
+    return 0
+}
+run_test 14 "open(O_CREAT), unlink |X| close"
+
+test_15() {
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    # give multiop a chance to open
+    sleep 1 
+    rm -f $DIR/$tfile
+    replay_barrier mds
+    touch $DIR/g11 || return 1
+    kill -USR1 $pid
+    wait $pid || return 2
+
+    fail mds
+    [ -e $DIR/$tfile ] && return 3
+    touch $DIR/h11 || return 4
+    return 0
+}
+run_test 15 "open(O_CREAT), unlink |X|  touch new, close"
+
+
+test_16() {
+    replay_barrier mds
+    mcreate $DIR/$tfile
+    unlink $DIR/$tfile
+    mcreate $DIR/$tfile-2
+    fail mds
+    [ -e $DIR/$tfile ] && return 1
+    [ -e $DIR/$tfile-2 ] || return 2
+    unlink $DIR/$tfile-2 || return 3
+}
+run_test 16 "|X| open(O_CREAT), unlink, touch new,  unlink new"
+
+test_17() {
+    replay_barrier mds
+    multiop $DIR/$tfile O_c &
+    pid=$!
+    # give multiop a chance to open
+    sleep 1 
+    fail mds
+    kill -USR1 $pid || return 1
+    wait $pid || return 2
+    $CHECKSTAT -t file $DIR/$tfile || return 3
+    rm $DIR/$tfile
+}
+run_test 17 "|X| open(O_CREAT), |replay| close"
+
+test_18() {
+    replay_barrier mds
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    # give multiop a chance to open
+    sleep 1 
+    rm -f $DIR/$tfile
+    touch $DIR/$tfile-2 || return 1
+    kill -USR1 $pid
+    wait $pid || return 2
+
+    fail mds
+    [ -e $DIR/$tfile ] && return 3
+    [ -e $DIR/$tfile-2 ] || return 4
+    # this touch frequently fails
+    touch $DIR/$tfile-3 || return 5
+    unlink $DIR/$tfile-2 || return 6
+    unlink $DIR/$tfile-3 || return 7
     return 0
 }
-run_test 9 "open chmod 0 |x| write close"
+run_test 18 "|X| open(O_CREAT), unlink, touch new, close, touch, unlink"
 
+# bug 1855 (a simpler form of test_11 above)
+test_19() {
+    replay_barrier mds
+    mcreate $DIR/$tfile
+    echo "old" > $DIR/$tfile
+    mv $DIR/$tfile $DIR/$tfile-2
+    grep old $DIR/$tfile-2
+    fail mds
+    grep old $DIR/$tfile-2 || return 2
+}
+run_test 19 "|X| mcreate, open, write, rename "
 
-stop client $CLIENTLCONFARGS
-stop ost
-stop mds $MDSLCONFARGS --dump cleanup.log
+equals_msg test complete, cleaning up
+stop client ${FORCE:=--force} $CLIENTLCONFARGS
+stop ost ${FORCE}
+stop mds ${FORCE} $MDSLCONFARGS --dump cleanup.log
 
-trap - EXIT
index 5bc2d95..ebbb2b3 100644 (file)
@@ -72,7 +72,8 @@ int main (int argc, char **argv) {
        rc = fstat(fd, &st);
        if (rc < 0 || st.st_size != bytes) {
                printf("bad file %lu size first write %lu != %lu: rc %d\n",
-                      st.st_ino, st.st_size, bytes, rc);
+                      (unsigned long)st.st_ino, (unsigned long)st.st_size,
+                       bytes, rc);
                return 1;
        }
 
@@ -91,7 +92,8 @@ int main (int argc, char **argv) {
        rc = fstat(fd, &st);
        if (rc < 0 || st.st_size != bytes + bytes / 2) {
                printf("bad file %lu size second write %lu != %lu: rc %d\n",
-                      st.st_ino, st.st_size, bytes, rc);
+                      (unsigned long)st.st_ino, (unsigned long)st.st_size,
+                       bytes, rc);
                return 1;
        }
 
@@ -111,7 +113,8 @@ int main (int argc, char **argv) {
                rc = fstat(fd, &st);
                if (rc < 0 || st.st_size != bytes + bytes / 2) {
                        printf("bad file size after read %lu != %lu: rc %d\n",
-                              st.st_size, bytes + bytes / 2, rc);
+                              (unsigned long)st.st_size, bytes + bytes / 2,
+                               rc);
                        return 1;
                }