Whamcloud - gitweb
Modify changelog to use LustreNetLink instead of seq_file
authornathan <nathan>
Sat, 19 Sep 2009 18:36:40 +0000 (18:36 +0000)
committernathan <nathan>
Sat, 19 Sep 2009 18:36:40 +0000 (18:36 +0000)
b=20411
i=adilger
i=thomas.leibovici

15 files changed:
libcfs/include/libcfs/libcfs_kernelcomm.h
libcfs/libcfs/linux/linux-kernelcomm.c
libcfs/libcfs/ulinux/ulinux-kernelcomm.c
lustre/include/lprocfs_status.h
lustre/include/lustre/liblustreapi.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre/lustre_user.h
lustre/mdc/lproc_mdc.c
lustre/mdc/mdc_request.c
lustre/mdd/mdd_lproc.c
lustre/obdclass/lprocfs_status.c
lustre/tests/lreplicate-test.sh
lustre/utils/lfs.c
lustre/utils/liblustreapi.c
lustre/utils/lreplicate.c

index 8da8c96..5f31d1d 100644 (file)
@@ -59,17 +59,19 @@ struct lnl_hdr {
         __u16 lnl_magic;
         __u8  lnl_transport;  /* Each new Lustre feature should use a different
                                  transport */
-        __u8  padding1;
+        __u8  lnl_flags;
         __u16 lnl_msgtype;    /* Message type or opcode, transport-specific */
         __u16 lnl_msglen;
 } __attribute__((aligned(sizeof(__u64))));
 
 #define LNL_MAGIC  0x191C /*Lustre9etLinC */
+#define LNL_FL_BLOCK 0x01   /* Wait for send */
 
 /* lnl_msgtype values are defined in each transport */
 enum lnl_transport_type {
-        LNL_TRANSPORT_GENERIC = 1,
-        LNL_TRANSPORT_HSM     = 2,
+        LNL_TRANSPORT_GENERIC   = 1,
+        LNL_TRANSPORT_HSM       = 2,
+        LNL_TRANSPORT_CHANGELOG = 3,
 };
 
 enum lnl_generic_message_type {
@@ -79,8 +81,8 @@ enum lnl_generic_message_type {
 /* LNL Broadcast Groups. This determines which userspace process hears which
  * messages.  Mutliple transports may be used within a group, or multiple
  * groups may use the same transport.  Broadcast
- * groups need not be used if e.g. a PID is specified instead.
- * Leaving group 0 unassigned at the moment.
+ * groups need not be used if e.g. a PID is specified instead;
+ * use group 0 to signify unicast.
  */
 #define LNL_GRP_HSM           0x02
 #define LNL_GRP_CNT              2
index cb5457f..b53a712 100644 (file)
@@ -199,10 +199,14 @@ int libcfs_klnl_msg_put(int pid, int group, void *payload)
         if (!skb)
                 return -ENOMEM;
 
-        if (pid)
-                rc = nlmsg_unicast(lnl_socket, skb, pid);
-        else
+        if (pid) {
+                rc = netlink_unicast(lnl_socket, skb, pid,
+                             lnlh->lnl_flags & LNL_FL_BLOCK ? 0 : MSG_DONTWAIT);
+                if (rc > 0)
+                        rc = 0;
+        } else {
                 rc = nlmsg_multicast(lnl_socket, skb, 0, group);
+        }
 
         CDEBUG(0, "Sent message pid=%d, group=%d, rc=%d\n", pid, group, rc);
 
index ab5688b..4d7e09d 100644 (file)
@@ -87,6 +87,7 @@ int libcfs_ulnl_stop(lustre_netlink *link)
 }
 
 /** Read a message from the netlink layer.
+ * Allocates memory, returns handle
  *
  * @param link Private descriptor for pipe/socket.
  * @param maxsize Maximum message size allowed
@@ -124,7 +125,6 @@ int libcfs_ulnl_msg_get(lustre_netlink *link, int maxsize, int transport,
                 /* Read message from kernel */
                 rc = recvmsg(*link, &msg, 0);
                 if (rc <= 0) {
-                        perror("recv");
                         rc = -errno;
                         break;
                 }
index 25e4b98..11bc074 100644 (file)
@@ -697,25 +697,6 @@ extern int lprocfs_quota_rd_qs_factor(char *page, char **start, off_t off,
 extern int lprocfs_quota_wr_qs_factor(struct file *file, const char *buffer,
                                       unsigned long count, void *data);
 
-/** struct for holding changelog data for seq_file processing */
-struct changelog_seq_iter {
-        void *csi_dev;
-        struct llog_ctxt *csi_ctxt;
-        struct llog_handle *csi_llh;
-        __u64 csi_startrec;
-        __u64 csi_endrec;
-        loff_t csi_pos;
-        int csi_wrote;
-        int csi_startcat;
-        int csi_startidx;
-        int csi_fill:1;
-        int csi_done:1;
-};
-int changelog_seq_open(struct inode *inode, struct file *file,
-                       struct changelog_seq_iter **csih);
-int changelog_seq_release(struct inode *inode, struct file *file);
-loff_t changelog_seq_lseek(struct file *file, loff_t offset, int origin);
-
 
 
 #else
index 94b8746..20fd103 100644 (file)
@@ -196,7 +196,11 @@ extern int llapi_path2fid(const char *path, lustre_fid *fid);
 
 /* Changelog interface.  priv is private state, managed internally
    by these functions */
-#define CHANGELOG_FLAG_FOLLOW 0x01
+#define CHANGELOG_FLAG_FOLLOW 0x01   /* Not yet implemented */
+#define CHANGELOG_FLAG_BLOCK  0x02   /* Blocking IO makes sense in case of
+   slow user parsing of the records, but it also prevents us from cleaning
+   up if the records are not consumed. */
+
 extern int llapi_changelog_start(void **priv, int flags, const char *mdtname,
                                  long long startrec);
 extern int llapi_changelog_fini(void **priv);
index a3fb371..076217f 100644 (file)
@@ -2301,6 +2301,12 @@ struct changelog_setinfo {
         __u32 cs_id;
 } __attribute__((packed));
 
+struct changelog_show {
+        __u64 cs_startrec;
+        __u32 cs_pid;
+        __u32 cs_flags;
+} __attribute__((packed));
+
 /** changelog record */
 struct llog_changelog_rec {
         struct llog_rec_hdr  cr_hdr;
index a8b5a5c..d1df0ee 100644 (file)
@@ -498,6 +498,7 @@ static inline const char *changelog_type2str(int type) {
 #define CLF_FLAGMASK 0x0FFF
 /* Anything under the flagmask may be per-type (if desired) */
 
+#define CR_MAXSIZE (PATH_MAX + sizeof(struct changelog_rec))
 struct changelog_rec {
         __u16                 cr_namelen;
         __u16                 cr_flags; /**< (flags&CLF_FLAGMASK)|CLF_VERSION */
@@ -519,6 +520,10 @@ struct ioc_changelog_clear {
         __u64 icc_recno;
 };
 
+enum changelog_message_type {
+        CL_RECORD = 10, /* message is a changelog_rec */
+        CL_EOF    = 11, /* at end of current changelog */
+};
 
 /********* Misc **********/
 
index c5cefe4..0258d5f 100644 (file)
@@ -77,60 +77,178 @@ static int mdc_wr_max_rpcs_in_flight(struct file *file, const char *buffer,
         return count;
 }
 
-static int mdc_changelog_seq_release(struct inode *inode, struct file *file)
+static struct lnl_hdr *changelog_lnl_alloc(int len, int flags)
 {
-        struct seq_file *seq = file->private_data;
-        struct changelog_seq_iter *csi = seq->private;
+        struct lnl_hdr *lh;
 
-        if (csi && csi->csi_llh)
-                llog_cat_put(csi->csi_llh);
-        if (csi && csi->csi_ctxt)
-                llog_ctxt_put(csi->csi_ctxt);
+        OBD_ALLOC(lh, len);
+        if (lh == NULL)
+                RETURN(NULL);
 
-        return (changelog_seq_release(inode, file));
+        lh->lnl_magic = LNL_MAGIC;
+        lh->lnl_transport = LNL_TRANSPORT_CHANGELOG;
+        lh->lnl_flags = flags;
+        lh->lnl_msgtype = CL_RECORD;
+        lh->lnl_msglen = len;
+        return lh;
 }
 
-static int mdc_changelog_seq_open(struct inode *inode, struct file *file)
+#define D_CHANGELOG 0
+
+static int changelog_show_cb(struct llog_handle *llh, struct llog_rec_hdr *hdr,
+                             void *data)
 {
-        struct changelog_seq_iter *csi;
-        int rc;
+        struct changelog_show *cs = data;
+        struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
+        struct lnl_hdr *lh;
+        int len, rc;
         ENTRY;
 
-        rc = changelog_seq_open(inode, file, &csi);
-        if (rc)
-                RETURN(rc);
+        if ((rec->cr_hdr.lrh_type != CHANGELOG_REC) ||
+            (rec->cr.cr_type >= CL_LAST)) {
+                CERROR("Not a changelog rec %d/%d\n", rec->cr_hdr.lrh_type,
+                       rec->cr.cr_type);
+                RETURN(-EINVAL);
+        }
+
+        if (rec->cr.cr_index < cs->cs_startrec) {
+                /* Skip entries earlier than what we are interested in */
+                CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64"\n",
+                       rec->cr.cr_index, cs->cs_startrec);
+                RETURN(0);
+        }
+
+        CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID
+               " %.*s\n", rec->cr.cr_index, rec->cr.cr_type,
+               changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
+               rec->cr.cr_flags & CLF_FLAGMASK,
+               PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
+               rec->cr.cr_namelen, rec->cr.cr_name);
+
+        len = sizeof(*lh) + sizeof(rec->cr) + rec->cr.cr_namelen;
+
+        /* Set up the netlink message */
+        lh = changelog_lnl_alloc(len, cs->cs_flags);
+        if (lh == NULL)
+                RETURN(-ENOMEM);
+        memcpy(lh + 1, &rec->cr, len - sizeof(*lh));
+
+        rc = libcfs_klnl_msg_put(cs->cs_pid, 0, lh);
+        CDEBUG(D_CHANGELOG, "nlmsg pid %d len %d rc %d\n", cs->cs_pid, len, rc);
+
+        OBD_FREE(lh, len);
+
+        RETURN(rc);
+}
+
+static int lproc_mdc_wr_changelog(struct file *file, const char *buffer,
+                                  unsigned long count, void *data)
+{
+        struct obd_device *obd = data;
+        struct llog_ctxt *ctxt;
+        struct llog_handle *llh;
+        struct lnl_hdr *lnlh;
+        struct changelog_show cs = {};
+        int rc;
+
+        CDEBUG(D_CHANGELOG, "file pid %d\n", file->f_owner.pid);
+
+        if (count != sizeof(cs))
+                return -EINVAL;
+
+        if (copy_from_user(&cs, buffer, sizeof(cs)))
+                return -EFAULT;
+
+        CDEBUG(D_CHANGELOG, "changelog to pid=%d(%d) start "LPU64"\n",
+               cs.cs_pid, file->f_owner.pid, cs.cs_startrec);
 
         /* Set up the remote catalog handle */
-        /* Note the proc file is set up with obd in data, not mdc_device */
-        csi->csi_ctxt = llog_get_context((struct obd_device *)csi->csi_dev,
-                                         LLOG_CHANGELOG_REPL_CTXT);
-        if (csi->csi_ctxt == NULL)
-                GOTO(out, rc = -ENOENT);
-        rc = llog_create(csi->csi_ctxt, &csi->csi_llh, NULL, CHANGELOG_CATALOG);
+        ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
+        if (ctxt == NULL)
+                RETURN(-ENOENT);
+        rc = llog_create(ctxt, &llh, NULL, CHANGELOG_CATALOG);
         if (rc) {
                 CERROR("llog_create() failed %d\n", rc);
                 GOTO(out, rc);
         }
-        rc = llog_init_handle(csi->csi_llh, LLOG_F_IS_CAT, NULL);
+        rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
         if (rc) {
                 CERROR("llog_init_handle failed %d\n", rc);
                 GOTO(out, rc);
         }
 
+        rc = llog_cat_process(llh, changelog_show_cb, &cs, 0, 0);
+
+        /* Send EOF */
+        if ((lnlh = changelog_lnl_alloc(sizeof(*lnlh), cs.cs_flags))) {
+                lnlh->lnl_msgtype = CL_EOF;
+                libcfs_klnl_msg_put(cs.cs_pid, 0, lnlh);
+                OBD_FREE(lnlh, sizeof(*lnlh));
+        }
+
 out:
-        if (rc)
-                mdc_changelog_seq_release(inode, file);
-        RETURN(rc);
+        if (llh)
+                llog_cat_put(llh);
+        if (ctxt)
+                llog_ctxt_put(ctxt);
+        if (rc < 0)
+                return rc;
+        return count;
 }
 
-static struct file_operations mdc_changelog_fops = {
-        .owner   = THIS_MODULE,
-        .open    = mdc_changelog_seq_open,
-        .read    = seq_read,
-        .llseek  = changelog_seq_lseek,
-        .release = mdc_changelog_seq_release,
-};
+/* temporary for testing */
+static int mdc_wr_netlink(struct file *file, const char *buffer,
+                          unsigned long count, void *data)
+{
+        struct obd_device *obd = data;
+        struct lnl_hdr *lh;
+        struct hsm_action_list *hal;
+        struct hsm_action_item *hai;
+        int len;
+        int pid, rc;
+
+        rc = lprocfs_write_helper(buffer, count, &pid);
+        if (rc)
+                return rc;
 
+        if (pid < 0)
+                return -ERANGE;
+        CWARN("message to pid %d\n", pid);
+
+        len = sizeof(*lh) + sizeof(*hal) + MTI_NAME_MAXLEN +
+                /* for mockup below */ 2 * size_round(sizeof(*hai));
+
+        OBD_ALLOC(lh, len);
+
+        lh->lnl_magic = LNL_MAGIC;
+        lh->lnl_transport = LNL_TRANSPORT_HSM;
+        lh->lnl_msgtype = HMT_ACTION_LIST;
+        lh->lnl_msglen = len;
+
+        hal = (struct hsm_action_list *)(lh + 1);
+        hal->hal_version = HAL_VERSION;
+        hal->hal_archive_num = 1;
+        obd_uuid2fsname(hal->hal_fsname, obd->obd_name, MTI_NAME_MAXLEN);
+
+        /* mock up an action list */
+        hal->hal_count = 2;
+        hai = hai_zero(hal);
+        hai->hai_action = HSMA_ARCHIVE;
+        hai->hai_fid.f_oid = 5;
+        hai->hai_len = sizeof(*hai);
+        hai = hai_next(hai);
+        hai->hai_action = HSMA_RESTORE;
+        hai->hai_fid.f_oid = 10;
+        hai->hai_len = sizeof(*hai);
+
+        /* This works for either broadcast or unicast to a single pid */
+        rc = libcfs_klnl_msg_put(pid, pid == 0 ? LNL_GRP_HSM : 0, lh);
+
+        OBD_FREE(lh, len);
+        if (rc < 0)
+                return rc;
+        return count;
+}
 
 static struct lprocfs_vars lprocfs_mdc_obd_vars[] = {
         { "uuid",            lprocfs_rd_uuid,        0, 0 },
@@ -152,7 +270,8 @@ static struct lprocfs_vars lprocfs_mdc_obd_vars[] = {
         { "timeouts",        lprocfs_rd_timeouts,    0, 0 },
         { "import",          lprocfs_rd_import,      0, 0 },
         { "state",           lprocfs_rd_state,       0, 0 },
-        { "changelog",       0, 0, 0, &mdc_changelog_fops, 0400 },
+        { "changelog_trigger",0,lproc_mdc_wr_changelog, 0 },
+        { "hsm_nl",          0, mdc_wr_netlink,      0, 0, 0222 },
         { 0 }
 };
 
index 963efd4..0fe780b 100644 (file)
@@ -1659,6 +1659,7 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 
         /* ignore errors */
         libcfs_klnl_start(LNL_TRANSPORT_HSM);
+        libcfs_klnl_start(LNL_TRANSPORT_CHANGELOG);
 
         RETURN(rc);
 
@@ -1728,6 +1729,7 @@ static int mdc_cleanup(struct obd_device *obd)
         struct client_obd *cli = &obd->u.cli;
 
         libcfs_klnl_stop(LNL_TRANSPORT_HSM, LNL_GRP_HSM);
+        libcfs_klnl_stop(LNL_TRANSPORT_CHANGELOG, 0);
 
         OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
         OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
index d37229a..dfcb921 100644 (file)
@@ -258,153 +258,6 @@ static int lprocfs_rd_changelog_users(char *page, char **start, off_t off,
         return cucb.idx;
 }
 
-/* non-seq version for direct calling by class_process_proc_param */
-static int mdd_changelog_write(struct file *file, const char *buffer,
-                               unsigned long count, void *data)
-{
-        struct mdd_device *mdd = (struct mdd_device *)data;
-        char kernbuf[32];
-        char *end;
-        int rc;
-
-        if (count > (sizeof(kernbuf) - 1))
-                goto out_usage;
-
-        count = min_t(unsigned long, count, sizeof(kernbuf));
-        if (copy_from_user(kernbuf, buffer, count))
-                return -EFAULT;
-
-        kernbuf[count] = '\0';
-        /* strip trailing newline from "echo blah" */
-        if (kernbuf[count - 1] == '\n')
-                kernbuf[count - 1] = '\0';
-
-        /* Forced on/off/purge rec, independent of changelog users! */
-        if (strcmp(kernbuf, "on") == 0) {
-                rc = mdd_changelog_on(mdd, 1);
-        } else if (strcmp(kernbuf, "off") == 0) {
-                rc = mdd_changelog_on(mdd, 0);
-        } else {
-                /* purge to an index */
-                long long unsigned endrec;
-
-                endrec = (long long)simple_strtoull(kernbuf, &end, 0);
-                if (end == kernbuf)
-                        goto out_usage;
-
-                LCONSOLE_INFO("changelog purge to %llu\n", endrec);
-
-                rc = mdd_changelog_llog_cancel(mdd, endrec);
-        }
-
-        if (rc < 0)
-                return rc;
-        return count;
-
-out_usage:
-        CWARN("changelog write usage: [on|off] | <purge_idx (0=all)>\n");
-        return -EINVAL;
-}
-
-static ssize_t mdd_changelog_seq_write(struct file *file, const char *buffer,
-                                       size_t count, loff_t *off)
-{
-        struct seq_file *seq = file->private_data;
-        struct changelog_seq_iter *csi = seq->private;
-        struct mdd_device *mdd = (struct mdd_device *)csi->csi_dev;
-
-        return mdd_changelog_write(file, buffer, count, mdd);
-}
-
-static int mdd_changelog_done(struct changelog_seq_iter *csi)
-{
-        struct mdd_device *mdd = (struct mdd_device *)csi->csi_dev;
-        int done = 0;
-
-        spin_lock(&mdd->mdd_cl.mc_lock);
-        done = (csi->csi_endrec >= mdd->mdd_cl.mc_index);
-        spin_unlock(&mdd->mdd_cl.mc_lock);
-        return done;
-}
-
-/* handle nonblocking */
-static ssize_t mdd_changelog_seq_read(struct file *file, char __user *buf,
-                                      size_t count, loff_t *ppos)
-{
-        struct seq_file *seq = (struct seq_file *)file->private_data;
-        struct changelog_seq_iter *csi = seq->private;
-        int rc;
-        ENTRY;
-
-        if ((file->f_flags & O_NONBLOCK) && mdd_changelog_done(csi))
-                RETURN(-EAGAIN);
-
-        csi->csi_done = 0;
-        rc = seq_read(file, buf, count, ppos);
-        RETURN(rc);
-}
-
-/* handle nonblocking */
-static unsigned int mdd_changelog_seq_poll(struct file *file, poll_table *wait)
-{
-        struct seq_file *seq = (struct seq_file *)file->private_data;
-        struct changelog_seq_iter *csi = seq->private;
-        struct mdd_device *mdd = (struct mdd_device *)csi->csi_dev;
-        ENTRY;
-
-        csi->csi_done = 0;
-        poll_wait(file, &mdd->mdd_cl.mc_waitq, wait);
-        if (!mdd_changelog_done(csi))
-                RETURN(POLLIN | POLLRDNORM);
-
-        RETURN(0);
-}
-
-static int mdd_changelog_seq_open(struct inode *inode, struct file *file)
-{
-        struct changelog_seq_iter *csi;
-        struct obd_device *obd;
-        int rc;
-        ENTRY;
-
-        rc = changelog_seq_open(inode, file, &csi);
-        if (rc)
-                RETURN(rc);
-
-        /* The proc file is set up with mdd in data, not obd */
-        obd = mdd2obd_dev((struct mdd_device *)csi->csi_dev);
-        csi->csi_ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
-        if (csi->csi_ctxt == NULL) {
-                changelog_seq_release(inode, file);
-                RETURN(-ENOENT);
-        }
-        /* The handle is set up in llog_obd_origin_setup */
-        csi->csi_llh = csi->csi_ctxt->loc_handle;
-        RETURN(rc);
-}
-
-static int mdd_changelog_seq_release(struct inode *inode, struct file *file)
-{
-        struct seq_file *seq = file->private_data;
-        struct changelog_seq_iter *csi = seq->private;
-
-        if (csi && csi->csi_ctxt)
-                llog_ctxt_put(csi->csi_ctxt);
-
-        return (changelog_seq_release(inode, file));
-}
-
-/* mdd changelog proc can handle nonblocking ops and writing to purge recs */
-struct file_operations mdd_changelog_fops = {
-        .owner   = THIS_MODULE,
-        .open    = mdd_changelog_seq_open,
-        .read    = mdd_changelog_seq_read,
-        .write   = mdd_changelog_seq_write,
-        .llseek  = changelog_seq_lseek,
-        .poll    = mdd_changelog_seq_poll,
-        .release = mdd_changelog_seq_release,
-};
-
 #ifdef HAVE_QUOTA_SUPPORT
 static int mdd_lprocfs_quota_rd_type(char *page, char **start, off_t off,
                                      int count, int *eof, void *data)
@@ -451,7 +304,6 @@ static struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
         { "changelog_mask",  lprocfs_rd_changelog_mask,
                              lprocfs_wr_changelog_mask, 0 },
         { "changelog_users", lprocfs_rd_changelog_users, 0, 0},
-        { "changelog", 0, mdd_changelog_write, 0, &mdd_changelog_fops, 0600 },
 #ifdef HAVE_QUOTA_SUPPORT
         { "quota_type",      mdd_lprocfs_quota_rd_type,
                              mdd_lprocfs_quota_wr_type, 0 },
index 71da72a..22acf70 100644 (file)
@@ -2257,265 +2257,6 @@ int lprocfs_obd_wr_recovery_maxtime(struct file *file, const char *buffer,
 EXPORT_SYMBOL(lprocfs_obd_wr_recovery_maxtime);
 
 
-/**** Changelogs *****/
-#define D_CHANGELOG 0
-
-/* How many records per seq_show.  Too small, we spawn llog_process threads
-   too often; too large, we run out of buffer space */
-#define CHANGELOG_CHUNK_SIZE 100
-
-static int changelog_show_cb(struct llog_handle *llh, struct llog_rec_hdr *hdr,
-                             void *data)
-{
-        struct seq_file *seq = (struct seq_file *)data;
-        struct changelog_seq_iter *csi = seq->private;
-        struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
-        char *ptr;
-        int cnt, rc;
-        ENTRY;
-
-        if ((rec->cr_hdr.lrh_type != CHANGELOG_REC) ||
-            (rec->cr.cr_type >= CL_LAST)) {
-                CERROR("Not a changelog rec %d/%d\n", rec->cr_hdr.lrh_type,
-                       rec->cr.cr_type);
-                RETURN(-EINVAL);
-        }
-
-        CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64" cat=%d:%d start=%d:%d\n",
-               rec->cr.cr_index, csi->csi_startrec,
-               llh->lgh_hdr->llh_cat_idx, llh->lgh_cur_idx,
-               csi->csi_startcat, csi->csi_startidx);
-
-        if (rec->cr.cr_index < csi->csi_startrec)
-                /* Skip entries earlier than what we are interested in */
-                RETURN(0);
-        if (rec->cr.cr_index == csi->csi_startrec) {
-                /* Remember where we started, since seq_read will re-read
-                 * the data when it reallocs space.  Sigh, if only there was
-                 * a way to tell seq_file how big the buf should be in the
-                 * first place...
-                 */
-                csi->csi_startcat = llh->lgh_hdr->llh_cat_idx;
-                csi->csi_startidx = rec->cr_hdr.lrh_index - 1;
-        }
-        if (csi->csi_wrote > CHANGELOG_CHUNK_SIZE) {
-                /* Stop at some point with a reasonable seq_file buffer size.
-                 * Start from here the next time.
-                 */
-                csi->csi_endrec = rec->cr.cr_index - 1;
-                csi->csi_startcat = llh->lgh_hdr->llh_cat_idx;
-                csi->csi_startidx = rec->cr_hdr.lrh_index - 1;
-                csi->csi_wrote = 0;
-                RETURN(LLOG_PROC_BREAK);
-        }
-
-        CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID
-               " %.*s\n", rec->cr.cr_index, rec->cr.cr_type,
-               changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
-               rec->cr.cr_flags & CLF_FLAGMASK,
-               PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
-               rec->cr.cr_namelen, rec->cr.cr_name);
-
-        cnt = sizeof(rec->cr) + rec->cr.cr_namelen;
-        ptr = (char *)(&rec->cr);
-        CDEBUG(D_CHANGELOG, "packed rec %d starting at %p\n", cnt, ptr);
-        rc = 0;
-        while ((cnt-- > 0) && (rc == 0)) {
-                rc = seq_putc(seq, *ptr);
-                ptr++;
-        }
-
-        if (rc < 0) {
-                /* Ran out of room in the seq buffer. seq_read will dump
-                 * the whole buffer and re-seq_start with a larger one;
-                 * no point in continuing the llog_process */
-                CDEBUG(D_CHANGELOG, "rec="LPU64" overflow "LPU64"<-"LPU64"\n",
-                       rec->cr.cr_index, csi->csi_startrec, csi->csi_endrec);
-                csi->csi_endrec = csi->csi_startrec - 1;
-                csi->csi_wrote = 0;
-                RETURN(LLOG_PROC_BREAK);
-        }
-
-        csi->csi_wrote++;
-        csi->csi_endrec = rec->cr.cr_index;
-
-        RETURN(0);
-}
-
-static int changelog_seq_show(struct seq_file *seq, void *v)
-{
-        struct changelog_seq_iter *csi = seq->private;
-        int rc;
-        ENTRY;
-
-        if (csi->csi_fill) {
-                /* seq_read wants more data to fill his buffer. But we already
-                   filled the buf as much as we cared to; force seq_read to
-                   accept that by padding with 0's */
-                while (seq_putc(seq, 0) == 0);
-                RETURN(0);
-        }
-
-        /* Since we have to restart the llog_cat_process for each chunk of the
-           seq_ functions, start from where we left off. */
-        rc = llog_cat_process(csi->csi_llh, changelog_show_cb, seq,
-                              csi->csi_startcat, csi->csi_startidx);
-
-        CDEBUG(D_CHANGELOG,"seq_show "LPU64"-"LPU64" cat=%d:%d wrote=%d rc=%d\n",
-               csi->csi_startrec, csi->csi_endrec, csi->csi_startcat,
-               csi->csi_startidx, csi->csi_wrote, rc);
-
-        if (rc == 0)
-                csi->csi_done = 1;
-        if (rc == LLOG_PROC_BREAK)
-                /* more records left, but seq_show must return 0 */
-                rc = 0;
-        RETURN(rc);
-}
-
-static void *changelog_seq_start(struct seq_file *seq, loff_t *pos)
-{
-        struct changelog_seq_iter *csi = seq->private;
-        LASSERT(csi);
-
-        CDEBUG(D_CHANGELOG, "start "LPU64"-"LPU64" pos="LPU64"\n",
-               csi->csi_startrec, csi->csi_endrec, *pos);
-
-        csi->csi_fill = 0;
-
-        if (csi->csi_done)
-                /* no more records, seq_read should return 0 if buffer
-                   is empty */
-                return NULL;
-
-        if (*pos > csi->csi_pos) {
-                /* The seq_read implementation sucks.  It may call start
-                   multiple times, using pos to indicate advances, if any,
-                   by arbitrarily increasing it by 1. So ignore the actual
-                   value of pos, and just register any increase as
-                   "seq_read wants the next values". */
-                csi->csi_startrec = csi->csi_endrec + 1;
-                csi->csi_pos = *pos;
-        }
-        /* else use old startrec/startidx */
-
-        return csi;
-}
-
-static void changelog_seq_stop(struct seq_file *seq, void *v)
-{
-        struct changelog_seq_iter *csi = seq->private;
-
-        CDEBUG(D_CHANGELOG, "stop "LPU64"-"LPU64"\n",
-               csi->csi_startrec, csi->csi_endrec);
-}
-
-static void *changelog_seq_next(struct seq_file *seq, void *v, loff_t *pos)
-{
-        struct changelog_seq_iter *csi = seq->private;
-
-        CDEBUG(D_CHANGELOG, "next "LPU64"-"LPU64" pos="LPU64"\n",
-               csi->csi_startrec, csi->csi_endrec, *pos);
-
-        csi->csi_fill = 1;
-
-        return csi;
-}
-
-static struct seq_operations changelog_sops = {
-        .start = changelog_seq_start,
-        .stop = changelog_seq_stop,
-        .next = changelog_seq_next,
-        .show = changelog_seq_show,
-};
-
-int changelog_seq_open(struct inode *inode, struct file *file,
-                       struct changelog_seq_iter **csih)
-{
-        struct changelog_seq_iter *csi;
-        struct proc_dir_entry *dp = PDE(inode);
-        struct seq_file *seq;
-        int rc;
-
-        LPROCFS_ENTRY_AND_CHECK(dp);
-
-        rc = seq_open(file, &changelog_sops);
-        if (rc) {
-                LPROCFS_EXIT();
-                return rc;
-        }
-
-        OBD_ALLOC_PTR(csi);
-        if (csi == NULL) {
-                lprocfs_seq_release(inode, file);
-                return -ENOMEM;
-        }
-
-        csi->csi_dev = dp->data;
-        seq = file->private_data;
-        seq->private = csi;
-        *csih = csi;
-
-        return rc;
-}
-EXPORT_SYMBOL(changelog_seq_open);
-
-int changelog_seq_release(struct inode *inode, struct file *file)
-{
-        struct seq_file *seq = file->private_data;
-        struct changelog_seq_iter *csi = seq->private;
-
-        if (csi)
-                OBD_FREE_PTR(csi);
-
-        return lprocfs_seq_release(inode, file);
-}
-EXPORT_SYMBOL(changelog_seq_release);
-
-#ifndef SEEK_CUR /* SLES10 needs this */
-#define SEEK_CUR        1
-#define SEEK_END        2
-#endif
-
-loff_t changelog_seq_lseek(struct file *file, loff_t offset, int origin)
-{
-        struct seq_file *seq = (struct seq_file *)file->private_data;
-        struct changelog_seq_iter *csi = seq->private;
-
-        CDEBUG(D_CHANGELOG,"seek "LPU64"-"LPU64" off="LPU64":%d fpos="LPU64"\n",
-               csi->csi_startrec, csi->csi_endrec, offset, origin, file->f_pos);
-
-        LL_SEQ_LOCK(seq);
-
-        switch (origin) {
-                case SEEK_CUR:
-                        offset += csi->csi_endrec;
-                        break;
-                case SEEK_END:
-                        /* we don't know the last rec */
-                        offset = -1;
-        }
-
-        /* SEEK_SET */
-
-        if (offset < 0) {
-                LL_SEQ_UNLOCK(seq);
-                return -EINVAL;
-        }
-
-        csi->csi_startrec = offset;
-        csi->csi_endrec = offset ? offset - 1 : 0;
-
-        /* drop whatever is left in sucky seq_read's buffer */
-        seq->count = 0;
-        seq->from = 0;
-        seq->index++;
-        LL_SEQ_UNLOCK(seq);
-        file->f_pos = csi->csi_startrec;
-        return csi->csi_startrec;
-}
-EXPORT_SYMBOL(changelog_seq_lseek);
-
 EXPORT_SYMBOL(lprocfs_register);
 EXPORT_SYMBOL(lprocfs_srch);
 EXPORT_SYMBOL(lprocfs_remove);
index 4e0d261..df8be4b 100644 (file)
@@ -187,7 +187,6 @@ test_1() {
     fini_changelog
     cleanup_src_tgt
     return $RC
-
 }
 run_test 1 "Simple Replication"
 
@@ -518,9 +517,9 @@ test_7() {
     createmany -o $DIR/$tdir/$tfile $NUMFILES
 
     # To simulate replication to another lustre filesystem, replicate
-    # the changes to $DIR/tgt. Disable changelogs before replication
-    # so that the files created as part of replication are not logged.
-    do_facet $SINGLEMDS lctl set_param -n mdd.$MDT0.changelog off
+    # the changes to $DIR/tgt. We can't turn off the changelogs
+    # while we are registered, so lreplicate better not try to 
+    # replicate the replication steps.  It seems ok :)
     mkdir $DIR/tgt
 
     $LREPLICATE -s $DIR -t $DIR/tgt -m $MDT0 -u $CL_USER -l $LREPL_LOG
index c2dc968..b11f16b 100644 (file)
@@ -225,8 +225,7 @@ command_t cmdlist[] = {
          "usage: ls [OPTION]... [FILE]..."},
         {"changelog", lfs_changelog, 0,
          "Show the metadata changes on an MDT."
-         "\nusage: changelog [--follow] <mdtname> [startrec [endrec]]"
-         "\n(note: --follow is only valid when run on MDT node)"},
+         "\nusage: changelog <mdtname> [startrec [endrec]]"},
         {"changelog_clear", lfs_changelog_clear, 0,
          "Indicate that old changelog records up to <endrec> are no longer of "
          "interest to consumer <id>, allowing the system to free up space.\n"
@@ -2367,11 +2366,15 @@ static int lfs_changelog(int argc, char **argv)
                                rec->cr_namelen, rec->cr_name);
                 else
                         printf("\n");
+
                 llapi_changelog_free(&rec);
         }
 
         llapi_changelog_fini(&changelog_priv);
 
+        if (rc < 0)
+                fprintf(stderr, "Changelog: %s\n", strerror(errno = -rc));
+
         return (rc == 1 ? 0 : rc);
 }
 
index 7862288..f8643c6 100644 (file)
@@ -2660,49 +2660,51 @@ static int get_mdtname(char *name, char *format, char *buf)
 #define CHANGELOG_PRIV_MAGIC 0xCA8E1080
 struct changelog_private {
         int magic;
-        int fd;
         int flags;
+        lustre_netlink lnl;
 };
 
 /** Start reading from a changelog
  * @param priv Opaque private control structure
- * @param flags Start flags (e.g. follow)
+ * @param flags Start flags (e.g. CHANGELOG_FLAG_BLOCK)
  * @param device Report changes recorded on this MDT
  * @param startrec Report changes beginning with this record number
+ * (just call llapi_changelog_fini when done; don't need an endrec)
  */
 int llapi_changelog_start(void **priv, int flags, const char *device,
                           long long startrec)
 {
         struct changelog_private *cp;
-        char path[256];
+        struct changelog_show cs = {};
         char mdtname[20];
-        int rc, fd;
+        char pattern[100];
+        char trigger[100];
+        int fd, rc, pid;
 
-        if (device[0] == '/')
-                rc = llapi_search_fsname(device, mdtname);
-        else
-                strncpy(mdtname, device, sizeof(mdtname));
+        /* Find mdtname from path, fsname, mdtname, or mdtname_UUID */
+        if (device[0] == '/') {
+                if ((rc = llapi_search_fsname(device, mdtname)))
+                        return rc;
+                if ((rc = get_mdtname(mdtname, "%s%s", mdtname)) < 0)
+                        return rc;
+        } else {
+                if ((rc = get_mdtname((char *)device, "%s%s", mdtname)) < 0)
+                        return rc;
+        }
 
-        /* Use either the mdd changelog (preferred) or a client mdc changelog */
-        if (get_mdtname(mdtname,
-                        "/proc/fs/lustre/md[cd]/%s%s{,-mdc-*}/changelog",
-                        path) < 0)
-                return -EINVAL;
-        rc = first_match(path, path);
+        /* Find corresponding mdc trigger */
+        snprintf(pattern, PATH_MAX,
+                 "/proc/fs/lustre/mdc/%s-*/changelog_trigger", mdtname);
+        rc = first_match(pattern, trigger);
         if (rc)
                 return rc;
 
-        if ((fd = open(path, O_RDONLY)) < 0) {
-                llapi_err(LLAPI_MSG_ERROR, "error: can't open |%s|\n", path);
-                return -errno;
-        }
-
-        rc = lseek(fd, (off_t)startrec, SEEK_SET);
-        if (rc < 0) {
-                llapi_err(LLAPI_MSG_ERROR, "can't seek rc=%d\n", rc);
+        /* Make sure we can write the trigger */
+        fd = open(trigger, O_WRONLY);
+        if (fd < 0)
                 return -errno;
-        }
 
+        /* Set up the receiver control struct */
         cp = malloc(sizeof(*cp));
         if (cp == NULL) {
                 close(fd);
@@ -2710,11 +2712,39 @@ int llapi_changelog_start(void **priv, int flags, const char *device,
         }
 
         cp->magic = CHANGELOG_PRIV_MAGIC;
-        cp->fd = fd;
         cp->flags = flags;
-        *priv = cp;
+        /* Start the receiver */
+        rc = libcfs_ulnl_start(&cp->lnl, 0 /* unicast */);
+        if (rc < 0)
+                goto out_free;
+
+        /* We need to trigger Lustre to start sending messages now.
+           We could send a lnl message to a kernel listener,
+           or write into proc.  Proc has the advantage of running in this
+           context, avoiding the need for a kernel thread. */
+        cs.cs_pid = getpid();
+        cs.cs_startrec = startrec;
+        cs.cs_flags = flags & CHANGELOG_FLAG_BLOCK ? LNL_FL_BLOCK : 0;
+        if ((pid = fork()) < 0) {
+                goto out_free;
+        } else if (!pid) {
+                /* Write triggers Lustre to start sending, but it
+                   won't return until it is complete, meaning everything
+                   got shipped through lnl (or error).  So we trigger it
+                   from a child process here, allowing the llapi call to
+                   return and wait for the lnl messages. */
+                rc = write(fd, &cs, sizeof(cs));
+                exit(rc);
+        }
 
+        close(fd);
+        *priv = cp;
         return 0;
+
+out_free:
+        free(cp);
+        close(fd);
+        return rc;
 }
 
 /** Finish reading from a changelog */
@@ -2725,22 +2755,12 @@ int llapi_changelog_fini(void **priv)
         if (!cp || (cp->magic != CHANGELOG_PRIV_MAGIC))
                 return -EINVAL;
 
-        close(cp->fd);
+        libcfs_ulnl_stop(&cp->lnl);
         free(cp);
         *priv = NULL;
         return 0;
 }
 
-static int pollwait(int fd) {
-        struct pollfd pfds[1];
-        int rc;
-
-        pfds[0].fd = fd;
-        pfds[0].events = POLLIN;
-        rc = poll(pfds, 1, -1);
-        return rc < 0 ? -errno : rc;
-}
-
 /** Read the next changelog entry
  * @param priv Opaque private control structure
  * @param rech Changelog record handle; record will be allocated here
@@ -2751,7 +2771,7 @@ static int pollwait(int fd) {
 int llapi_changelog_recv(void *priv, struct changelog_rec **rech)
 {
         struct changelog_private *cp = (struct changelog_private *)priv;
-        struct changelog_rec rec, *recp;
+        struct lnl_hdr *lnlh;
         int rc = 0;
 
         if (!cp || (cp->magic != CHANGELOG_PRIV_MAGIC))
@@ -2759,41 +2779,50 @@ int llapi_changelog_recv(void *priv, struct changelog_rec **rech)
         if (rech == NULL)
                 return -EINVAL;
 
-readrec:
-        /* Read in the rec to get the namelen */
-        rc = read(cp->fd, &rec, sizeof(rec));
+repeat:
+        rc = libcfs_ulnl_msg_get(&cp->lnl, CR_MAXSIZE, LNL_TRANSPORT_CHANGELOG,
+                                 &lnlh);
         if (rc < 0)
-                return -errno;
-        if (rc == 0) {
-                if (cp->flags && CHANGELOG_FLAG_FOLLOW) {
-                        rc = pollwait(cp->fd);
-                        if (rc < 0)
-                                return rc;
-                        goto readrec;
-                }
-                return 1;
+                return rc;
+
+        if ((lnlh->lnl_transport != LNL_TRANSPORT_CHANGELOG) ||
+            ((lnlh->lnl_msgtype != CL_RECORD) &&
+             (lnlh->lnl_msgtype != CL_EOF))) {
+                llapi_err(LLAPI_MSG_ERROR | LLAPI_MSG_NO_ERRNO,
+                          "Unknown changelog message type %d:%d\n",
+                          lnlh->lnl_transport, lnlh->lnl_msgtype);
+                rc = -EPROTO;
+                goto out_free;
         }
 
-        recp = malloc(sizeof(rec) + rec.cr_namelen);
-        if (recp == NULL)
-                return -ENOMEM;
-        memcpy(recp, &rec, sizeof(rec));
-        rc = read(cp->fd, recp->cr_name, rec.cr_namelen);
-        if (rc < 0) {
-                free(recp);
-                llapi_err(LLAPI_MSG_ERROR, "Can't read entire filename");
-                return -errno;
+        if (lnlh->lnl_msgtype == CL_EOF) {
+                if (cp->flags & CHANGELOG_FLAG_FOLLOW) {
+                        /* Ignore EOFs */
+                        goto repeat;
+                } else {
+                        rc = 1;
+                        goto out_free;
+                }
         }
 
-        *rech = recp;
+        /* Our message is a changelog_rec */
+        *rech = (struct changelog_rec *)(lnlh + 1);
+
         return 0;
+
+out_free:
+        libcfs_ulnl_msg_free(&lnlh);
+        *rech = NULL;
+        return rc;
 }
 
 /** Release the changelog record when done with it. */
 int llapi_changelog_free(struct changelog_rec **rech)
 {
-        if (*rech)
-                free(*rech);
+        if (*rech) {
+                struct lnl_hdr *lnlh = (struct lnl_hdr *)*rech - 1;
+                libcfs_ulnl_msg_free(&lnlh);
+        }
         *rech = NULL;
         return 0;
 }
index da88342..b9aeb49 100644 (file)
@@ -1057,6 +1057,9 @@ int lr_parse_line(void *priv, struct lr_info *info)
         strncpy(info->name, rec->cr_name, rec->cr_namelen);
         info->name[rec->cr_namelen] = '\0';
 
+        if (verbose > 1)
+                printf("Rec %lld: %d %s\n", info->recno, info->type,info->name);
+
         llapi_changelog_free(&rec);
 
         rec_count++;
@@ -1387,8 +1390,8 @@ int lr_replicate()
         lr_print_status(info);
 
         /* Open changelogs for consumption*/
-        rc = llapi_changelog_start(&changelog_priv, 0, status->ls_source_fs,
-                                   status->ls_last_recno);
+        rc = llapi_changelog_start(&changelog_priv, CHANGELOG_FLAG_BLOCK,
+                                   status->ls_source_fs, status->ls_last_recno);
         if (rc < 0) {
                 fprintf(stderr, "Error opening changelog file for fs %s.\n",
                         status->ls_source_fs);