From: nathan Date: Sat, 19 Sep 2009 18:36:40 +0000 (+0000) Subject: Modify changelog to use LustreNetLink instead of seq_file X-Git-Tag: v1_9_270~22 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=84d5f6b6d92b097684110de51e8a3ce022ee7220;p=fs%2Flustre-release.git Modify changelog to use LustreNetLink instead of seq_file b=20411 i=adilger i=thomas.leibovici --- diff --git a/libcfs/include/libcfs/libcfs_kernelcomm.h b/libcfs/include/libcfs/libcfs_kernelcomm.h index 8da8c96..5f31d1d 100644 --- a/libcfs/include/libcfs/libcfs_kernelcomm.h +++ b/libcfs/include/libcfs/libcfs_kernelcomm.h @@ -59,17 +59,19 @@ struct lnl_hdr { __u16 lnl_magic; __u8 lnl_transport; /* Each new Lustre feature should use a different transport */ - __u8 padding1; + __u8 lnl_flags; __u16 lnl_msgtype; /* Message type or opcode, transport-specific */ __u16 lnl_msglen; } __attribute__((aligned(sizeof(__u64)))); #define LNL_MAGIC 0x191C /*Lustre9etLinC */ +#define LNL_FL_BLOCK 0x01 /* Wait for send */ /* lnl_msgtype values are defined in each transport */ enum lnl_transport_type { - LNL_TRANSPORT_GENERIC = 1, - LNL_TRANSPORT_HSM = 2, + LNL_TRANSPORT_GENERIC = 1, + LNL_TRANSPORT_HSM = 2, + LNL_TRANSPORT_CHANGELOG = 3, }; enum lnl_generic_message_type { @@ -79,8 +81,8 @@ enum lnl_generic_message_type { /* LNL Broadcast Groups. This determines which userspace process hears which * messages. Mutliple transports may be used within a group, or multiple * groups may use the same transport. Broadcast - * groups need not be used if e.g. a PID is specified instead. - * Leaving group 0 unassigned at the moment. + * groups need not be used if e.g. a PID is specified instead; + * use group 0 to signify unicast. */ #define LNL_GRP_HSM 0x02 #define LNL_GRP_CNT 2 diff --git a/libcfs/libcfs/linux/linux-kernelcomm.c b/libcfs/libcfs/linux/linux-kernelcomm.c index cb5457f..b53a712 100644 --- a/libcfs/libcfs/linux/linux-kernelcomm.c +++ b/libcfs/libcfs/linux/linux-kernelcomm.c @@ -199,10 +199,14 @@ int libcfs_klnl_msg_put(int pid, int group, void *payload) if (!skb) return -ENOMEM; - if (pid) - rc = nlmsg_unicast(lnl_socket, skb, pid); - else + if (pid) { + rc = netlink_unicast(lnl_socket, skb, pid, + lnlh->lnl_flags & LNL_FL_BLOCK ? 0 : MSG_DONTWAIT); + if (rc > 0) + rc = 0; + } else { rc = nlmsg_multicast(lnl_socket, skb, 0, group); + } CDEBUG(0, "Sent message pid=%d, group=%d, rc=%d\n", pid, group, rc); diff --git a/libcfs/libcfs/ulinux/ulinux-kernelcomm.c b/libcfs/libcfs/ulinux/ulinux-kernelcomm.c index ab5688b..4d7e09d 100644 --- a/libcfs/libcfs/ulinux/ulinux-kernelcomm.c +++ b/libcfs/libcfs/ulinux/ulinux-kernelcomm.c @@ -87,6 +87,7 @@ int libcfs_ulnl_stop(lustre_netlink *link) } /** Read a message from the netlink layer. + * Allocates memory, returns handle * * @param link Private descriptor for pipe/socket. * @param maxsize Maximum message size allowed @@ -124,7 +125,6 @@ int libcfs_ulnl_msg_get(lustre_netlink *link, int maxsize, int transport, /* Read message from kernel */ rc = recvmsg(*link, &msg, 0); if (rc <= 0) { - perror("recv"); rc = -errno; break; } diff --git a/lustre/include/lprocfs_status.h b/lustre/include/lprocfs_status.h index 25e4b98..11bc074 100644 --- a/lustre/include/lprocfs_status.h +++ b/lustre/include/lprocfs_status.h @@ -697,25 +697,6 @@ extern int lprocfs_quota_rd_qs_factor(char *page, char **start, off_t off, extern int lprocfs_quota_wr_qs_factor(struct file *file, const char *buffer, unsigned long count, void *data); -/** struct for holding changelog data for seq_file processing */ -struct changelog_seq_iter { - void *csi_dev; - struct llog_ctxt *csi_ctxt; - struct llog_handle *csi_llh; - __u64 csi_startrec; - __u64 csi_endrec; - loff_t csi_pos; - int csi_wrote; - int csi_startcat; - int csi_startidx; - int csi_fill:1; - int csi_done:1; -}; -int changelog_seq_open(struct inode *inode, struct file *file, - struct changelog_seq_iter **csih); -int changelog_seq_release(struct inode *inode, struct file *file); -loff_t changelog_seq_lseek(struct file *file, loff_t offset, int origin); - #else diff --git a/lustre/include/lustre/liblustreapi.h b/lustre/include/lustre/liblustreapi.h index 94b8746..20fd103 100644 --- a/lustre/include/lustre/liblustreapi.h +++ b/lustre/include/lustre/liblustreapi.h @@ -196,7 +196,11 @@ extern int llapi_path2fid(const char *path, lustre_fid *fid); /* Changelog interface. priv is private state, managed internally by these functions */ -#define CHANGELOG_FLAG_FOLLOW 0x01 +#define CHANGELOG_FLAG_FOLLOW 0x01 /* Not yet implemented */ +#define CHANGELOG_FLAG_BLOCK 0x02 /* Blocking IO makes sense in case of + slow user parsing of the records, but it also prevents us from cleaning + up if the records are not consumed. */ + extern int llapi_changelog_start(void **priv, int flags, const char *mdtname, long long startrec); extern int llapi_changelog_fini(void **priv); diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index a3fb371..076217f 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -2301,6 +2301,12 @@ struct changelog_setinfo { __u32 cs_id; } __attribute__((packed)); +struct changelog_show { + __u64 cs_startrec; + __u32 cs_pid; + __u32 cs_flags; +} __attribute__((packed)); + /** changelog record */ struct llog_changelog_rec { struct llog_rec_hdr cr_hdr; diff --git a/lustre/include/lustre/lustre_user.h b/lustre/include/lustre/lustre_user.h index a8b5a5c..d1df0ee 100644 --- a/lustre/include/lustre/lustre_user.h +++ b/lustre/include/lustre/lustre_user.h @@ -498,6 +498,7 @@ static inline const char *changelog_type2str(int type) { #define CLF_FLAGMASK 0x0FFF /* Anything under the flagmask may be per-type (if desired) */ +#define CR_MAXSIZE (PATH_MAX + sizeof(struct changelog_rec)) struct changelog_rec { __u16 cr_namelen; __u16 cr_flags; /**< (flags&CLF_FLAGMASK)|CLF_VERSION */ @@ -519,6 +520,10 @@ struct ioc_changelog_clear { __u64 icc_recno; }; +enum changelog_message_type { + CL_RECORD = 10, /* message is a changelog_rec */ + CL_EOF = 11, /* at end of current changelog */ +}; /********* Misc **********/ diff --git a/lustre/mdc/lproc_mdc.c b/lustre/mdc/lproc_mdc.c index c5cefe4..0258d5f 100644 --- a/lustre/mdc/lproc_mdc.c +++ b/lustre/mdc/lproc_mdc.c @@ -77,60 +77,178 @@ static int mdc_wr_max_rpcs_in_flight(struct file *file, const char *buffer, return count; } -static int mdc_changelog_seq_release(struct inode *inode, struct file *file) +static struct lnl_hdr *changelog_lnl_alloc(int len, int flags) { - struct seq_file *seq = file->private_data; - struct changelog_seq_iter *csi = seq->private; + struct lnl_hdr *lh; - if (csi && csi->csi_llh) - llog_cat_put(csi->csi_llh); - if (csi && csi->csi_ctxt) - llog_ctxt_put(csi->csi_ctxt); + OBD_ALLOC(lh, len); + if (lh == NULL) + RETURN(NULL); - return (changelog_seq_release(inode, file)); + lh->lnl_magic = LNL_MAGIC; + lh->lnl_transport = LNL_TRANSPORT_CHANGELOG; + lh->lnl_flags = flags; + lh->lnl_msgtype = CL_RECORD; + lh->lnl_msglen = len; + return lh; } -static int mdc_changelog_seq_open(struct inode *inode, struct file *file) +#define D_CHANGELOG 0 + +static int changelog_show_cb(struct llog_handle *llh, struct llog_rec_hdr *hdr, + void *data) { - struct changelog_seq_iter *csi; - int rc; + struct changelog_show *cs = data; + struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; + struct lnl_hdr *lh; + int len, rc; ENTRY; - rc = changelog_seq_open(inode, file, &csi); - if (rc) - RETURN(rc); + if ((rec->cr_hdr.lrh_type != CHANGELOG_REC) || + (rec->cr.cr_type >= CL_LAST)) { + CERROR("Not a changelog rec %d/%d\n", rec->cr_hdr.lrh_type, + rec->cr.cr_type); + RETURN(-EINVAL); + } + + if (rec->cr.cr_index < cs->cs_startrec) { + /* Skip entries earlier than what we are interested in */ + CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64"\n", + rec->cr.cr_index, cs->cs_startrec); + RETURN(0); + } + + CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID + " %.*s\n", rec->cr.cr_index, rec->cr.cr_type, + changelog_type2str(rec->cr.cr_type), rec->cr.cr_time, + rec->cr.cr_flags & CLF_FLAGMASK, + PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), + rec->cr.cr_namelen, rec->cr.cr_name); + + len = sizeof(*lh) + sizeof(rec->cr) + rec->cr.cr_namelen; + + /* Set up the netlink message */ + lh = changelog_lnl_alloc(len, cs->cs_flags); + if (lh == NULL) + RETURN(-ENOMEM); + memcpy(lh + 1, &rec->cr, len - sizeof(*lh)); + + rc = libcfs_klnl_msg_put(cs->cs_pid, 0, lh); + CDEBUG(D_CHANGELOG, "nlmsg pid %d len %d rc %d\n", cs->cs_pid, len, rc); + + OBD_FREE(lh, len); + + RETURN(rc); +} + +static int lproc_mdc_wr_changelog(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *obd = data; + struct llog_ctxt *ctxt; + struct llog_handle *llh; + struct lnl_hdr *lnlh; + struct changelog_show cs = {}; + int rc; + + CDEBUG(D_CHANGELOG, "file pid %d\n", file->f_owner.pid); + + if (count != sizeof(cs)) + return -EINVAL; + + if (copy_from_user(&cs, buffer, sizeof(cs))) + return -EFAULT; + + CDEBUG(D_CHANGELOG, "changelog to pid=%d(%d) start "LPU64"\n", + cs.cs_pid, file->f_owner.pid, cs.cs_startrec); /* Set up the remote catalog handle */ - /* Note the proc file is set up with obd in data, not mdc_device */ - csi->csi_ctxt = llog_get_context((struct obd_device *)csi->csi_dev, - LLOG_CHANGELOG_REPL_CTXT); - if (csi->csi_ctxt == NULL) - GOTO(out, rc = -ENOENT); - rc = llog_create(csi->csi_ctxt, &csi->csi_llh, NULL, CHANGELOG_CATALOG); + ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT); + if (ctxt == NULL) + RETURN(-ENOENT); + rc = llog_create(ctxt, &llh, NULL, CHANGELOG_CATALOG); if (rc) { CERROR("llog_create() failed %d\n", rc); GOTO(out, rc); } - rc = llog_init_handle(csi->csi_llh, LLOG_F_IS_CAT, NULL); + rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL); if (rc) { CERROR("llog_init_handle failed %d\n", rc); GOTO(out, rc); } + rc = llog_cat_process(llh, changelog_show_cb, &cs, 0, 0); + + /* Send EOF */ + if ((lnlh = changelog_lnl_alloc(sizeof(*lnlh), cs.cs_flags))) { + lnlh->lnl_msgtype = CL_EOF; + libcfs_klnl_msg_put(cs.cs_pid, 0, lnlh); + OBD_FREE(lnlh, sizeof(*lnlh)); + } + out: - if (rc) - mdc_changelog_seq_release(inode, file); - RETURN(rc); + if (llh) + llog_cat_put(llh); + if (ctxt) + llog_ctxt_put(ctxt); + if (rc < 0) + return rc; + return count; } -static struct file_operations mdc_changelog_fops = { - .owner = THIS_MODULE, - .open = mdc_changelog_seq_open, - .read = seq_read, - .llseek = changelog_seq_lseek, - .release = mdc_changelog_seq_release, -}; +/* temporary for testing */ +static int mdc_wr_netlink(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *obd = data; + struct lnl_hdr *lh; + struct hsm_action_list *hal; + struct hsm_action_item *hai; + int len; + int pid, rc; + + rc = lprocfs_write_helper(buffer, count, &pid); + if (rc) + return rc; + if (pid < 0) + return -ERANGE; + CWARN("message to pid %d\n", pid); + + len = sizeof(*lh) + sizeof(*hal) + MTI_NAME_MAXLEN + + /* for mockup below */ 2 * size_round(sizeof(*hai)); + + OBD_ALLOC(lh, len); + + lh->lnl_magic = LNL_MAGIC; + lh->lnl_transport = LNL_TRANSPORT_HSM; + lh->lnl_msgtype = HMT_ACTION_LIST; + lh->lnl_msglen = len; + + hal = (struct hsm_action_list *)(lh + 1); + hal->hal_version = HAL_VERSION; + hal->hal_archive_num = 1; + obd_uuid2fsname(hal->hal_fsname, obd->obd_name, MTI_NAME_MAXLEN); + + /* mock up an action list */ + hal->hal_count = 2; + hai = hai_zero(hal); + hai->hai_action = HSMA_ARCHIVE; + hai->hai_fid.f_oid = 5; + hai->hai_len = sizeof(*hai); + hai = hai_next(hai); + hai->hai_action = HSMA_RESTORE; + hai->hai_fid.f_oid = 10; + hai->hai_len = sizeof(*hai); + + /* This works for either broadcast or unicast to a single pid */ + rc = libcfs_klnl_msg_put(pid, pid == 0 ? LNL_GRP_HSM : 0, lh); + + OBD_FREE(lh, len); + if (rc < 0) + return rc; + return count; +} static struct lprocfs_vars lprocfs_mdc_obd_vars[] = { { "uuid", lprocfs_rd_uuid, 0, 0 }, @@ -152,7 +270,8 @@ static struct lprocfs_vars lprocfs_mdc_obd_vars[] = { { "timeouts", lprocfs_rd_timeouts, 0, 0 }, { "import", lprocfs_rd_import, 0, 0 }, { "state", lprocfs_rd_state, 0, 0 }, - { "changelog", 0, 0, 0, &mdc_changelog_fops, 0400 }, + { "changelog_trigger",0,lproc_mdc_wr_changelog, 0 }, + { "hsm_nl", 0, mdc_wr_netlink, 0, 0, 0222 }, { 0 } }; diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 963efd4..0fe780b 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -1659,6 +1659,7 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) /* ignore errors */ libcfs_klnl_start(LNL_TRANSPORT_HSM); + libcfs_klnl_start(LNL_TRANSPORT_CHANGELOG); RETURN(rc); @@ -1728,6 +1729,7 @@ static int mdc_cleanup(struct obd_device *obd) struct client_obd *cli = &obd->u.cli; libcfs_klnl_stop(LNL_TRANSPORT_HSM, LNL_GRP_HSM); + libcfs_klnl_stop(LNL_TRANSPORT_CHANGELOG, 0); OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock)); OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock)); diff --git a/lustre/mdd/mdd_lproc.c b/lustre/mdd/mdd_lproc.c index d37229a..dfcb921 100644 --- a/lustre/mdd/mdd_lproc.c +++ b/lustre/mdd/mdd_lproc.c @@ -258,153 +258,6 @@ static int lprocfs_rd_changelog_users(char *page, char **start, off_t off, return cucb.idx; } -/* non-seq version for direct calling by class_process_proc_param */ -static int mdd_changelog_write(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct mdd_device *mdd = (struct mdd_device *)data; - char kernbuf[32]; - char *end; - int rc; - - if (count > (sizeof(kernbuf) - 1)) - goto out_usage; - - count = min_t(unsigned long, count, sizeof(kernbuf)); - if (copy_from_user(kernbuf, buffer, count)) - return -EFAULT; - - kernbuf[count] = '\0'; - /* strip trailing newline from "echo blah" */ - if (kernbuf[count - 1] == '\n') - kernbuf[count - 1] = '\0'; - - /* Forced on/off/purge rec, independent of changelog users! */ - if (strcmp(kernbuf, "on") == 0) { - rc = mdd_changelog_on(mdd, 1); - } else if (strcmp(kernbuf, "off") == 0) { - rc = mdd_changelog_on(mdd, 0); - } else { - /* purge to an index */ - long long unsigned endrec; - - endrec = (long long)simple_strtoull(kernbuf, &end, 0); - if (end == kernbuf) - goto out_usage; - - LCONSOLE_INFO("changelog purge to %llu\n", endrec); - - rc = mdd_changelog_llog_cancel(mdd, endrec); - } - - if (rc < 0) - return rc; - return count; - -out_usage: - CWARN("changelog write usage: [on|off] | \n"); - return -EINVAL; -} - -static ssize_t mdd_changelog_seq_write(struct file *file, const char *buffer, - size_t count, loff_t *off) -{ - struct seq_file *seq = file->private_data; - struct changelog_seq_iter *csi = seq->private; - struct mdd_device *mdd = (struct mdd_device *)csi->csi_dev; - - return mdd_changelog_write(file, buffer, count, mdd); -} - -static int mdd_changelog_done(struct changelog_seq_iter *csi) -{ - struct mdd_device *mdd = (struct mdd_device *)csi->csi_dev; - int done = 0; - - spin_lock(&mdd->mdd_cl.mc_lock); - done = (csi->csi_endrec >= mdd->mdd_cl.mc_index); - spin_unlock(&mdd->mdd_cl.mc_lock); - return done; -} - -/* handle nonblocking */ -static ssize_t mdd_changelog_seq_read(struct file *file, char __user *buf, - size_t count, loff_t *ppos) -{ - struct seq_file *seq = (struct seq_file *)file->private_data; - struct changelog_seq_iter *csi = seq->private; - int rc; - ENTRY; - - if ((file->f_flags & O_NONBLOCK) && mdd_changelog_done(csi)) - RETURN(-EAGAIN); - - csi->csi_done = 0; - rc = seq_read(file, buf, count, ppos); - RETURN(rc); -} - -/* handle nonblocking */ -static unsigned int mdd_changelog_seq_poll(struct file *file, poll_table *wait) -{ - struct seq_file *seq = (struct seq_file *)file->private_data; - struct changelog_seq_iter *csi = seq->private; - struct mdd_device *mdd = (struct mdd_device *)csi->csi_dev; - ENTRY; - - csi->csi_done = 0; - poll_wait(file, &mdd->mdd_cl.mc_waitq, wait); - if (!mdd_changelog_done(csi)) - RETURN(POLLIN | POLLRDNORM); - - RETURN(0); -} - -static int mdd_changelog_seq_open(struct inode *inode, struct file *file) -{ - struct changelog_seq_iter *csi; - struct obd_device *obd; - int rc; - ENTRY; - - rc = changelog_seq_open(inode, file, &csi); - if (rc) - RETURN(rc); - - /* The proc file is set up with mdd in data, not obd */ - obd = mdd2obd_dev((struct mdd_device *)csi->csi_dev); - csi->csi_ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT); - if (csi->csi_ctxt == NULL) { - changelog_seq_release(inode, file); - RETURN(-ENOENT); - } - /* The handle is set up in llog_obd_origin_setup */ - csi->csi_llh = csi->csi_ctxt->loc_handle; - RETURN(rc); -} - -static int mdd_changelog_seq_release(struct inode *inode, struct file *file) -{ - struct seq_file *seq = file->private_data; - struct changelog_seq_iter *csi = seq->private; - - if (csi && csi->csi_ctxt) - llog_ctxt_put(csi->csi_ctxt); - - return (changelog_seq_release(inode, file)); -} - -/* mdd changelog proc can handle nonblocking ops and writing to purge recs */ -struct file_operations mdd_changelog_fops = { - .owner = THIS_MODULE, - .open = mdd_changelog_seq_open, - .read = mdd_changelog_seq_read, - .write = mdd_changelog_seq_write, - .llseek = changelog_seq_lseek, - .poll = mdd_changelog_seq_poll, - .release = mdd_changelog_seq_release, -}; - #ifdef HAVE_QUOTA_SUPPORT static int mdd_lprocfs_quota_rd_type(char *page, char **start, off_t off, int count, int *eof, void *data) @@ -451,7 +304,6 @@ static struct lprocfs_vars lprocfs_mdd_obd_vars[] = { { "changelog_mask", lprocfs_rd_changelog_mask, lprocfs_wr_changelog_mask, 0 }, { "changelog_users", lprocfs_rd_changelog_users, 0, 0}, - { "changelog", 0, mdd_changelog_write, 0, &mdd_changelog_fops, 0600 }, #ifdef HAVE_QUOTA_SUPPORT { "quota_type", mdd_lprocfs_quota_rd_type, mdd_lprocfs_quota_wr_type, 0 }, diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 71da72a..22acf70 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -2257,265 +2257,6 @@ int lprocfs_obd_wr_recovery_maxtime(struct file *file, const char *buffer, EXPORT_SYMBOL(lprocfs_obd_wr_recovery_maxtime); -/**** Changelogs *****/ -#define D_CHANGELOG 0 - -/* How many records per seq_show. Too small, we spawn llog_process threads - too often; too large, we run out of buffer space */ -#define CHANGELOG_CHUNK_SIZE 100 - -static int changelog_show_cb(struct llog_handle *llh, struct llog_rec_hdr *hdr, - void *data) -{ - struct seq_file *seq = (struct seq_file *)data; - struct changelog_seq_iter *csi = seq->private; - struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; - char *ptr; - int cnt, rc; - ENTRY; - - if ((rec->cr_hdr.lrh_type != CHANGELOG_REC) || - (rec->cr.cr_type >= CL_LAST)) { - CERROR("Not a changelog rec %d/%d\n", rec->cr_hdr.lrh_type, - rec->cr.cr_type); - RETURN(-EINVAL); - } - - CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64" cat=%d:%d start=%d:%d\n", - rec->cr.cr_index, csi->csi_startrec, - llh->lgh_hdr->llh_cat_idx, llh->lgh_cur_idx, - csi->csi_startcat, csi->csi_startidx); - - if (rec->cr.cr_index < csi->csi_startrec) - /* Skip entries earlier than what we are interested in */ - RETURN(0); - if (rec->cr.cr_index == csi->csi_startrec) { - /* Remember where we started, since seq_read will re-read - * the data when it reallocs space. Sigh, if only there was - * a way to tell seq_file how big the buf should be in the - * first place... - */ - csi->csi_startcat = llh->lgh_hdr->llh_cat_idx; - csi->csi_startidx = rec->cr_hdr.lrh_index - 1; - } - if (csi->csi_wrote > CHANGELOG_CHUNK_SIZE) { - /* Stop at some point with a reasonable seq_file buffer size. - * Start from here the next time. - */ - csi->csi_endrec = rec->cr.cr_index - 1; - csi->csi_startcat = llh->lgh_hdr->llh_cat_idx; - csi->csi_startidx = rec->cr_hdr.lrh_index - 1; - csi->csi_wrote = 0; - RETURN(LLOG_PROC_BREAK); - } - - CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID - " %.*s\n", rec->cr.cr_index, rec->cr.cr_type, - changelog_type2str(rec->cr.cr_type), rec->cr.cr_time, - rec->cr.cr_flags & CLF_FLAGMASK, - PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), - rec->cr.cr_namelen, rec->cr.cr_name); - - cnt = sizeof(rec->cr) + rec->cr.cr_namelen; - ptr = (char *)(&rec->cr); - CDEBUG(D_CHANGELOG, "packed rec %d starting at %p\n", cnt, ptr); - rc = 0; - while ((cnt-- > 0) && (rc == 0)) { - rc = seq_putc(seq, *ptr); - ptr++; - } - - if (rc < 0) { - /* Ran out of room in the seq buffer. seq_read will dump - * the whole buffer and re-seq_start with a larger one; - * no point in continuing the llog_process */ - CDEBUG(D_CHANGELOG, "rec="LPU64" overflow "LPU64"<-"LPU64"\n", - rec->cr.cr_index, csi->csi_startrec, csi->csi_endrec); - csi->csi_endrec = csi->csi_startrec - 1; - csi->csi_wrote = 0; - RETURN(LLOG_PROC_BREAK); - } - - csi->csi_wrote++; - csi->csi_endrec = rec->cr.cr_index; - - RETURN(0); -} - -static int changelog_seq_show(struct seq_file *seq, void *v) -{ - struct changelog_seq_iter *csi = seq->private; - int rc; - ENTRY; - - if (csi->csi_fill) { - /* seq_read wants more data to fill his buffer. But we already - filled the buf as much as we cared to; force seq_read to - accept that by padding with 0's */ - while (seq_putc(seq, 0) == 0); - RETURN(0); - } - - /* Since we have to restart the llog_cat_process for each chunk of the - seq_ functions, start from where we left off. */ - rc = llog_cat_process(csi->csi_llh, changelog_show_cb, seq, - csi->csi_startcat, csi->csi_startidx); - - CDEBUG(D_CHANGELOG,"seq_show "LPU64"-"LPU64" cat=%d:%d wrote=%d rc=%d\n", - csi->csi_startrec, csi->csi_endrec, csi->csi_startcat, - csi->csi_startidx, csi->csi_wrote, rc); - - if (rc == 0) - csi->csi_done = 1; - if (rc == LLOG_PROC_BREAK) - /* more records left, but seq_show must return 0 */ - rc = 0; - RETURN(rc); -} - -static void *changelog_seq_start(struct seq_file *seq, loff_t *pos) -{ - struct changelog_seq_iter *csi = seq->private; - LASSERT(csi); - - CDEBUG(D_CHANGELOG, "start "LPU64"-"LPU64" pos="LPU64"\n", - csi->csi_startrec, csi->csi_endrec, *pos); - - csi->csi_fill = 0; - - if (csi->csi_done) - /* no more records, seq_read should return 0 if buffer - is empty */ - return NULL; - - if (*pos > csi->csi_pos) { - /* The seq_read implementation sucks. It may call start - multiple times, using pos to indicate advances, if any, - by arbitrarily increasing it by 1. So ignore the actual - value of pos, and just register any increase as - "seq_read wants the next values". */ - csi->csi_startrec = csi->csi_endrec + 1; - csi->csi_pos = *pos; - } - /* else use old startrec/startidx */ - - return csi; -} - -static void changelog_seq_stop(struct seq_file *seq, void *v) -{ - struct changelog_seq_iter *csi = seq->private; - - CDEBUG(D_CHANGELOG, "stop "LPU64"-"LPU64"\n", - csi->csi_startrec, csi->csi_endrec); -} - -static void *changelog_seq_next(struct seq_file *seq, void *v, loff_t *pos) -{ - struct changelog_seq_iter *csi = seq->private; - - CDEBUG(D_CHANGELOG, "next "LPU64"-"LPU64" pos="LPU64"\n", - csi->csi_startrec, csi->csi_endrec, *pos); - - csi->csi_fill = 1; - - return csi; -} - -static struct seq_operations changelog_sops = { - .start = changelog_seq_start, - .stop = changelog_seq_stop, - .next = changelog_seq_next, - .show = changelog_seq_show, -}; - -int changelog_seq_open(struct inode *inode, struct file *file, - struct changelog_seq_iter **csih) -{ - struct changelog_seq_iter *csi; - struct proc_dir_entry *dp = PDE(inode); - struct seq_file *seq; - int rc; - - LPROCFS_ENTRY_AND_CHECK(dp); - - rc = seq_open(file, &changelog_sops); - if (rc) { - LPROCFS_EXIT(); - return rc; - } - - OBD_ALLOC_PTR(csi); - if (csi == NULL) { - lprocfs_seq_release(inode, file); - return -ENOMEM; - } - - csi->csi_dev = dp->data; - seq = file->private_data; - seq->private = csi; - *csih = csi; - - return rc; -} -EXPORT_SYMBOL(changelog_seq_open); - -int changelog_seq_release(struct inode *inode, struct file *file) -{ - struct seq_file *seq = file->private_data; - struct changelog_seq_iter *csi = seq->private; - - if (csi) - OBD_FREE_PTR(csi); - - return lprocfs_seq_release(inode, file); -} -EXPORT_SYMBOL(changelog_seq_release); - -#ifndef SEEK_CUR /* SLES10 needs this */ -#define SEEK_CUR 1 -#define SEEK_END 2 -#endif - -loff_t changelog_seq_lseek(struct file *file, loff_t offset, int origin) -{ - struct seq_file *seq = (struct seq_file *)file->private_data; - struct changelog_seq_iter *csi = seq->private; - - CDEBUG(D_CHANGELOG,"seek "LPU64"-"LPU64" off="LPU64":%d fpos="LPU64"\n", - csi->csi_startrec, csi->csi_endrec, offset, origin, file->f_pos); - - LL_SEQ_LOCK(seq); - - switch (origin) { - case SEEK_CUR: - offset += csi->csi_endrec; - break; - case SEEK_END: - /* we don't know the last rec */ - offset = -1; - } - - /* SEEK_SET */ - - if (offset < 0) { - LL_SEQ_UNLOCK(seq); - return -EINVAL; - } - - csi->csi_startrec = offset; - csi->csi_endrec = offset ? offset - 1 : 0; - - /* drop whatever is left in sucky seq_read's buffer */ - seq->count = 0; - seq->from = 0; - seq->index++; - LL_SEQ_UNLOCK(seq); - file->f_pos = csi->csi_startrec; - return csi->csi_startrec; -} -EXPORT_SYMBOL(changelog_seq_lseek); - EXPORT_SYMBOL(lprocfs_register); EXPORT_SYMBOL(lprocfs_srch); EXPORT_SYMBOL(lprocfs_remove); diff --git a/lustre/tests/lreplicate-test.sh b/lustre/tests/lreplicate-test.sh index 4e0d261..df8be4b 100644 --- a/lustre/tests/lreplicate-test.sh +++ b/lustre/tests/lreplicate-test.sh @@ -187,7 +187,6 @@ test_1() { fini_changelog cleanup_src_tgt return $RC - } run_test 1 "Simple Replication" @@ -518,9 +517,9 @@ test_7() { createmany -o $DIR/$tdir/$tfile $NUMFILES # To simulate replication to another lustre filesystem, replicate - # the changes to $DIR/tgt. Disable changelogs before replication - # so that the files created as part of replication are not logged. - do_facet $SINGLEMDS lctl set_param -n mdd.$MDT0.changelog off + # the changes to $DIR/tgt. We can't turn off the changelogs + # while we are registered, so lreplicate better not try to + # replicate the replication steps. It seems ok :) mkdir $DIR/tgt $LREPLICATE -s $DIR -t $DIR/tgt -m $MDT0 -u $CL_USER -l $LREPL_LOG diff --git a/lustre/utils/lfs.c b/lustre/utils/lfs.c index c2dc968..b11f16b 100644 --- a/lustre/utils/lfs.c +++ b/lustre/utils/lfs.c @@ -225,8 +225,7 @@ command_t cmdlist[] = { "usage: ls [OPTION]... [FILE]..."}, {"changelog", lfs_changelog, 0, "Show the metadata changes on an MDT." - "\nusage: changelog [--follow] [startrec [endrec]]" - "\n(note: --follow is only valid when run on MDT node)"}, + "\nusage: changelog [startrec [endrec]]"}, {"changelog_clear", lfs_changelog_clear, 0, "Indicate that old changelog records up to are no longer of " "interest to consumer , allowing the system to free up space.\n" @@ -2367,11 +2366,15 @@ static int lfs_changelog(int argc, char **argv) rec->cr_namelen, rec->cr_name); else printf("\n"); + llapi_changelog_free(&rec); } llapi_changelog_fini(&changelog_priv); + if (rc < 0) + fprintf(stderr, "Changelog: %s\n", strerror(errno = -rc)); + return (rc == 1 ? 0 : rc); } diff --git a/lustre/utils/liblustreapi.c b/lustre/utils/liblustreapi.c index 7862288..f8643c6 100644 --- a/lustre/utils/liblustreapi.c +++ b/lustre/utils/liblustreapi.c @@ -2660,49 +2660,51 @@ static int get_mdtname(char *name, char *format, char *buf) #define CHANGELOG_PRIV_MAGIC 0xCA8E1080 struct changelog_private { int magic; - int fd; int flags; + lustre_netlink lnl; }; /** Start reading from a changelog * @param priv Opaque private control structure - * @param flags Start flags (e.g. follow) + * @param flags Start flags (e.g. CHANGELOG_FLAG_BLOCK) * @param device Report changes recorded on this MDT * @param startrec Report changes beginning with this record number + * (just call llapi_changelog_fini when done; don't need an endrec) */ int llapi_changelog_start(void **priv, int flags, const char *device, long long startrec) { struct changelog_private *cp; - char path[256]; + struct changelog_show cs = {}; char mdtname[20]; - int rc, fd; + char pattern[100]; + char trigger[100]; + int fd, rc, pid; - if (device[0] == '/') - rc = llapi_search_fsname(device, mdtname); - else - strncpy(mdtname, device, sizeof(mdtname)); + /* Find mdtname from path, fsname, mdtname, or mdtname_UUID */ + if (device[0] == '/') { + if ((rc = llapi_search_fsname(device, mdtname))) + return rc; + if ((rc = get_mdtname(mdtname, "%s%s", mdtname)) < 0) + return rc; + } else { + if ((rc = get_mdtname((char *)device, "%s%s", mdtname)) < 0) + return rc; + } - /* Use either the mdd changelog (preferred) or a client mdc changelog */ - if (get_mdtname(mdtname, - "/proc/fs/lustre/md[cd]/%s%s{,-mdc-*}/changelog", - path) < 0) - return -EINVAL; - rc = first_match(path, path); + /* Find corresponding mdc trigger */ + snprintf(pattern, PATH_MAX, + "/proc/fs/lustre/mdc/%s-*/changelog_trigger", mdtname); + rc = first_match(pattern, trigger); if (rc) return rc; - if ((fd = open(path, O_RDONLY)) < 0) { - llapi_err(LLAPI_MSG_ERROR, "error: can't open |%s|\n", path); - return -errno; - } - - rc = lseek(fd, (off_t)startrec, SEEK_SET); - if (rc < 0) { - llapi_err(LLAPI_MSG_ERROR, "can't seek rc=%d\n", rc); + /* Make sure we can write the trigger */ + fd = open(trigger, O_WRONLY); + if (fd < 0) return -errno; - } + /* Set up the receiver control struct */ cp = malloc(sizeof(*cp)); if (cp == NULL) { close(fd); @@ -2710,11 +2712,39 @@ int llapi_changelog_start(void **priv, int flags, const char *device, } cp->magic = CHANGELOG_PRIV_MAGIC; - cp->fd = fd; cp->flags = flags; - *priv = cp; + /* Start the receiver */ + rc = libcfs_ulnl_start(&cp->lnl, 0 /* unicast */); + if (rc < 0) + goto out_free; + + /* We need to trigger Lustre to start sending messages now. + We could send a lnl message to a kernel listener, + or write into proc. Proc has the advantage of running in this + context, avoiding the need for a kernel thread. */ + cs.cs_pid = getpid(); + cs.cs_startrec = startrec; + cs.cs_flags = flags & CHANGELOG_FLAG_BLOCK ? LNL_FL_BLOCK : 0; + if ((pid = fork()) < 0) { + goto out_free; + } else if (!pid) { + /* Write triggers Lustre to start sending, but it + won't return until it is complete, meaning everything + got shipped through lnl (or error). So we trigger it + from a child process here, allowing the llapi call to + return and wait for the lnl messages. */ + rc = write(fd, &cs, sizeof(cs)); + exit(rc); + } + close(fd); + *priv = cp; return 0; + +out_free: + free(cp); + close(fd); + return rc; } /** Finish reading from a changelog */ @@ -2725,22 +2755,12 @@ int llapi_changelog_fini(void **priv) if (!cp || (cp->magic != CHANGELOG_PRIV_MAGIC)) return -EINVAL; - close(cp->fd); + libcfs_ulnl_stop(&cp->lnl); free(cp); *priv = NULL; return 0; } -static int pollwait(int fd) { - struct pollfd pfds[1]; - int rc; - - pfds[0].fd = fd; - pfds[0].events = POLLIN; - rc = poll(pfds, 1, -1); - return rc < 0 ? -errno : rc; -} - /** Read the next changelog entry * @param priv Opaque private control structure * @param rech Changelog record handle; record will be allocated here @@ -2751,7 +2771,7 @@ static int pollwait(int fd) { int llapi_changelog_recv(void *priv, struct changelog_rec **rech) { struct changelog_private *cp = (struct changelog_private *)priv; - struct changelog_rec rec, *recp; + struct lnl_hdr *lnlh; int rc = 0; if (!cp || (cp->magic != CHANGELOG_PRIV_MAGIC)) @@ -2759,41 +2779,50 @@ int llapi_changelog_recv(void *priv, struct changelog_rec **rech) if (rech == NULL) return -EINVAL; -readrec: - /* Read in the rec to get the namelen */ - rc = read(cp->fd, &rec, sizeof(rec)); +repeat: + rc = libcfs_ulnl_msg_get(&cp->lnl, CR_MAXSIZE, LNL_TRANSPORT_CHANGELOG, + &lnlh); if (rc < 0) - return -errno; - if (rc == 0) { - if (cp->flags && CHANGELOG_FLAG_FOLLOW) { - rc = pollwait(cp->fd); - if (rc < 0) - return rc; - goto readrec; - } - return 1; + return rc; + + if ((lnlh->lnl_transport != LNL_TRANSPORT_CHANGELOG) || + ((lnlh->lnl_msgtype != CL_RECORD) && + (lnlh->lnl_msgtype != CL_EOF))) { + llapi_err(LLAPI_MSG_ERROR | LLAPI_MSG_NO_ERRNO, + "Unknown changelog message type %d:%d\n", + lnlh->lnl_transport, lnlh->lnl_msgtype); + rc = -EPROTO; + goto out_free; } - recp = malloc(sizeof(rec) + rec.cr_namelen); - if (recp == NULL) - return -ENOMEM; - memcpy(recp, &rec, sizeof(rec)); - rc = read(cp->fd, recp->cr_name, rec.cr_namelen); - if (rc < 0) { - free(recp); - llapi_err(LLAPI_MSG_ERROR, "Can't read entire filename"); - return -errno; + if (lnlh->lnl_msgtype == CL_EOF) { + if (cp->flags & CHANGELOG_FLAG_FOLLOW) { + /* Ignore EOFs */ + goto repeat; + } else { + rc = 1; + goto out_free; + } } - *rech = recp; + /* Our message is a changelog_rec */ + *rech = (struct changelog_rec *)(lnlh + 1); + return 0; + +out_free: + libcfs_ulnl_msg_free(&lnlh); + *rech = NULL; + return rc; } /** Release the changelog record when done with it. */ int llapi_changelog_free(struct changelog_rec **rech) { - if (*rech) - free(*rech); + if (*rech) { + struct lnl_hdr *lnlh = (struct lnl_hdr *)*rech - 1; + libcfs_ulnl_msg_free(&lnlh); + } *rech = NULL; return 0; } diff --git a/lustre/utils/lreplicate.c b/lustre/utils/lreplicate.c index da88342..b9aeb49 100644 --- a/lustre/utils/lreplicate.c +++ b/lustre/utils/lreplicate.c @@ -1057,6 +1057,9 @@ int lr_parse_line(void *priv, struct lr_info *info) strncpy(info->name, rec->cr_name, rec->cr_namelen); info->name[rec->cr_namelen] = '\0'; + if (verbose > 1) + printf("Rec %lld: %d %s\n", info->recno, info->type,info->name); + llapi_changelog_free(&rec); rec_count++; @@ -1387,8 +1390,8 @@ int lr_replicate() lr_print_status(info); /* Open changelogs for consumption*/ - rc = llapi_changelog_start(&changelog_priv, 0, status->ls_source_fs, - status->ls_last_recno); + rc = llapi_changelog_start(&changelog_priv, CHANGELOG_FLAG_BLOCK, + status->ls_source_fs, status->ls_last_recno); if (rc < 0) { fprintf(stderr, "Error opening changelog file for fs %s.\n", status->ls_source_fs);