__u16 lnl_magic;
__u8 lnl_transport; /* Each new Lustre feature should use a different
transport */
- __u8 padding1;
+ __u8 lnl_flags;
__u16 lnl_msgtype; /* Message type or opcode, transport-specific */
__u16 lnl_msglen;
} __attribute__((aligned(sizeof(__u64))));
#define LNL_MAGIC 0x191C /*Lustre9etLinC */
+#define LNL_FL_BLOCK 0x01 /* Wait for send */
/* lnl_msgtype values are defined in each transport */
enum lnl_transport_type {
- LNL_TRANSPORT_GENERIC = 1,
- LNL_TRANSPORT_HSM = 2,
+ LNL_TRANSPORT_GENERIC = 1,
+ LNL_TRANSPORT_HSM = 2,
+ LNL_TRANSPORT_CHANGELOG = 3,
};
enum lnl_generic_message_type {
/* LNL Broadcast Groups. This determines which userspace process hears which
* messages. Mutliple transports may be used within a group, or multiple
* groups may use the same transport. Broadcast
- * groups need not be used if e.g. a PID is specified instead.
- * Leaving group 0 unassigned at the moment.
+ * groups need not be used if e.g. a PID is specified instead;
+ * use group 0 to signify unicast.
*/
#define LNL_GRP_HSM 0x02
#define LNL_GRP_CNT 2
if (!skb)
return -ENOMEM;
- if (pid)
- rc = nlmsg_unicast(lnl_socket, skb, pid);
- else
+ if (pid) {
+ rc = netlink_unicast(lnl_socket, skb, pid,
+ lnlh->lnl_flags & LNL_FL_BLOCK ? 0 : MSG_DONTWAIT);
+ if (rc > 0)
+ rc = 0;
+ } else {
rc = nlmsg_multicast(lnl_socket, skb, 0, group);
+ }
CDEBUG(0, "Sent message pid=%d, group=%d, rc=%d\n", pid, group, rc);
}
/** Read a message from the netlink layer.
+ * Allocates memory, returns handle
*
* @param link Private descriptor for pipe/socket.
* @param maxsize Maximum message size allowed
/* Read message from kernel */
rc = recvmsg(*link, &msg, 0);
if (rc <= 0) {
- perror("recv");
rc = -errno;
break;
}
extern int lprocfs_quota_wr_qs_factor(struct file *file, const char *buffer,
unsigned long count, void *data);
-/** struct for holding changelog data for seq_file processing */
-struct changelog_seq_iter {
- void *csi_dev;
- struct llog_ctxt *csi_ctxt;
- struct llog_handle *csi_llh;
- __u64 csi_startrec;
- __u64 csi_endrec;
- loff_t csi_pos;
- int csi_wrote;
- int csi_startcat;
- int csi_startidx;
- int csi_fill:1;
- int csi_done:1;
-};
-int changelog_seq_open(struct inode *inode, struct file *file,
- struct changelog_seq_iter **csih);
-int changelog_seq_release(struct inode *inode, struct file *file);
-loff_t changelog_seq_lseek(struct file *file, loff_t offset, int origin);
-
#else
/* Changelog interface. priv is private state, managed internally
by these functions */
-#define CHANGELOG_FLAG_FOLLOW 0x01
+#define CHANGELOG_FLAG_FOLLOW 0x01 /* Not yet implemented */
+#define CHANGELOG_FLAG_BLOCK 0x02 /* Blocking IO makes sense in case of
+ slow user parsing of the records, but it also prevents us from cleaning
+ up if the records are not consumed. */
+
extern int llapi_changelog_start(void **priv, int flags, const char *mdtname,
long long startrec);
extern int llapi_changelog_fini(void **priv);
__u32 cs_id;
} __attribute__((packed));
+struct changelog_show {
+ __u64 cs_startrec;
+ __u32 cs_pid;
+ __u32 cs_flags;
+} __attribute__((packed));
+
/** changelog record */
struct llog_changelog_rec {
struct llog_rec_hdr cr_hdr;
#define CLF_FLAGMASK 0x0FFF
/* Anything under the flagmask may be per-type (if desired) */
+#define CR_MAXSIZE (PATH_MAX + sizeof(struct changelog_rec))
struct changelog_rec {
__u16 cr_namelen;
__u16 cr_flags; /**< (flags&CLF_FLAGMASK)|CLF_VERSION */
__u64 icc_recno;
};
+enum changelog_message_type {
+ CL_RECORD = 10, /* message is a changelog_rec */
+ CL_EOF = 11, /* at end of current changelog */
+};
/********* Misc **********/
return count;
}
-static int mdc_changelog_seq_release(struct inode *inode, struct file *file)
+static struct lnl_hdr *changelog_lnl_alloc(int len, int flags)
{
- struct seq_file *seq = file->private_data;
- struct changelog_seq_iter *csi = seq->private;
+ struct lnl_hdr *lh;
- if (csi && csi->csi_llh)
- llog_cat_put(csi->csi_llh);
- if (csi && csi->csi_ctxt)
- llog_ctxt_put(csi->csi_ctxt);
+ OBD_ALLOC(lh, len);
+ if (lh == NULL)
+ RETURN(NULL);
- return (changelog_seq_release(inode, file));
+ lh->lnl_magic = LNL_MAGIC;
+ lh->lnl_transport = LNL_TRANSPORT_CHANGELOG;
+ lh->lnl_flags = flags;
+ lh->lnl_msgtype = CL_RECORD;
+ lh->lnl_msglen = len;
+ return lh;
}
-static int mdc_changelog_seq_open(struct inode *inode, struct file *file)
+#define D_CHANGELOG 0
+
+static int changelog_show_cb(struct llog_handle *llh, struct llog_rec_hdr *hdr,
+ void *data)
{
- struct changelog_seq_iter *csi;
- int rc;
+ struct changelog_show *cs = data;
+ struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
+ struct lnl_hdr *lh;
+ int len, rc;
ENTRY;
- rc = changelog_seq_open(inode, file, &csi);
- if (rc)
- RETURN(rc);
+ if ((rec->cr_hdr.lrh_type != CHANGELOG_REC) ||
+ (rec->cr.cr_type >= CL_LAST)) {
+ CERROR("Not a changelog rec %d/%d\n", rec->cr_hdr.lrh_type,
+ rec->cr.cr_type);
+ RETURN(-EINVAL);
+ }
+
+ if (rec->cr.cr_index < cs->cs_startrec) {
+ /* Skip entries earlier than what we are interested in */
+ CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64"\n",
+ rec->cr.cr_index, cs->cs_startrec);
+ RETURN(0);
+ }
+
+ CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID
+ " %.*s\n", rec->cr.cr_index, rec->cr.cr_type,
+ changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
+ rec->cr.cr_flags & CLF_FLAGMASK,
+ PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
+ rec->cr.cr_namelen, rec->cr.cr_name);
+
+ len = sizeof(*lh) + sizeof(rec->cr) + rec->cr.cr_namelen;
+
+ /* Set up the netlink message */
+ lh = changelog_lnl_alloc(len, cs->cs_flags);
+ if (lh == NULL)
+ RETURN(-ENOMEM);
+ memcpy(lh + 1, &rec->cr, len - sizeof(*lh));
+
+ rc = libcfs_klnl_msg_put(cs->cs_pid, 0, lh);
+ CDEBUG(D_CHANGELOG, "nlmsg pid %d len %d rc %d\n", cs->cs_pid, len, rc);
+
+ OBD_FREE(lh, len);
+
+ RETURN(rc);
+}
+
+static int lproc_mdc_wr_changelog(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct obd_device *obd = data;
+ struct llog_ctxt *ctxt;
+ struct llog_handle *llh;
+ struct lnl_hdr *lnlh;
+ struct changelog_show cs = {};
+ int rc;
+
+ CDEBUG(D_CHANGELOG, "file pid %d\n", file->f_owner.pid);
+
+ if (count != sizeof(cs))
+ return -EINVAL;
+
+ if (copy_from_user(&cs, buffer, sizeof(cs)))
+ return -EFAULT;
+
+ CDEBUG(D_CHANGELOG, "changelog to pid=%d(%d) start "LPU64"\n",
+ cs.cs_pid, file->f_owner.pid, cs.cs_startrec);
/* Set up the remote catalog handle */
- /* Note the proc file is set up with obd in data, not mdc_device */
- csi->csi_ctxt = llog_get_context((struct obd_device *)csi->csi_dev,
- LLOG_CHANGELOG_REPL_CTXT);
- if (csi->csi_ctxt == NULL)
- GOTO(out, rc = -ENOENT);
- rc = llog_create(csi->csi_ctxt, &csi->csi_llh, NULL, CHANGELOG_CATALOG);
+ ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
+ if (ctxt == NULL)
+ RETURN(-ENOENT);
+ rc = llog_create(ctxt, &llh, NULL, CHANGELOG_CATALOG);
if (rc) {
CERROR("llog_create() failed %d\n", rc);
GOTO(out, rc);
}
- rc = llog_init_handle(csi->csi_llh, LLOG_F_IS_CAT, NULL);
+ rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
if (rc) {
CERROR("llog_init_handle failed %d\n", rc);
GOTO(out, rc);
}
+ rc = llog_cat_process(llh, changelog_show_cb, &cs, 0, 0);
+
+ /* Send EOF */
+ if ((lnlh = changelog_lnl_alloc(sizeof(*lnlh), cs.cs_flags))) {
+ lnlh->lnl_msgtype = CL_EOF;
+ libcfs_klnl_msg_put(cs.cs_pid, 0, lnlh);
+ OBD_FREE(lnlh, sizeof(*lnlh));
+ }
+
out:
- if (rc)
- mdc_changelog_seq_release(inode, file);
- RETURN(rc);
+ if (llh)
+ llog_cat_put(llh);
+ if (ctxt)
+ llog_ctxt_put(ctxt);
+ if (rc < 0)
+ return rc;
+ return count;
}
-static struct file_operations mdc_changelog_fops = {
- .owner = THIS_MODULE,
- .open = mdc_changelog_seq_open,
- .read = seq_read,
- .llseek = changelog_seq_lseek,
- .release = mdc_changelog_seq_release,
-};
+/* temporary for testing */
+static int mdc_wr_netlink(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct obd_device *obd = data;
+ struct lnl_hdr *lh;
+ struct hsm_action_list *hal;
+ struct hsm_action_item *hai;
+ int len;
+ int pid, rc;
+
+ rc = lprocfs_write_helper(buffer, count, &pid);
+ if (rc)
+ return rc;
+ if (pid < 0)
+ return -ERANGE;
+ CWARN("message to pid %d\n", pid);
+
+ len = sizeof(*lh) + sizeof(*hal) + MTI_NAME_MAXLEN +
+ /* for mockup below */ 2 * size_round(sizeof(*hai));
+
+ OBD_ALLOC(lh, len);
+
+ lh->lnl_magic = LNL_MAGIC;
+ lh->lnl_transport = LNL_TRANSPORT_HSM;
+ lh->lnl_msgtype = HMT_ACTION_LIST;
+ lh->lnl_msglen = len;
+
+ hal = (struct hsm_action_list *)(lh + 1);
+ hal->hal_version = HAL_VERSION;
+ hal->hal_archive_num = 1;
+ obd_uuid2fsname(hal->hal_fsname, obd->obd_name, MTI_NAME_MAXLEN);
+
+ /* mock up an action list */
+ hal->hal_count = 2;
+ hai = hai_zero(hal);
+ hai->hai_action = HSMA_ARCHIVE;
+ hai->hai_fid.f_oid = 5;
+ hai->hai_len = sizeof(*hai);
+ hai = hai_next(hai);
+ hai->hai_action = HSMA_RESTORE;
+ hai->hai_fid.f_oid = 10;
+ hai->hai_len = sizeof(*hai);
+
+ /* This works for either broadcast or unicast to a single pid */
+ rc = libcfs_klnl_msg_put(pid, pid == 0 ? LNL_GRP_HSM : 0, lh);
+
+ OBD_FREE(lh, len);
+ if (rc < 0)
+ return rc;
+ return count;
+}
static struct lprocfs_vars lprocfs_mdc_obd_vars[] = {
{ "uuid", lprocfs_rd_uuid, 0, 0 },
{ "timeouts", lprocfs_rd_timeouts, 0, 0 },
{ "import", lprocfs_rd_import, 0, 0 },
{ "state", lprocfs_rd_state, 0, 0 },
- { "changelog", 0, 0, 0, &mdc_changelog_fops, 0400 },
+ { "changelog_trigger",0,lproc_mdc_wr_changelog, 0 },
+ { "hsm_nl", 0, mdc_wr_netlink, 0, 0, 0222 },
{ 0 }
};
/* ignore errors */
libcfs_klnl_start(LNL_TRANSPORT_HSM);
+ libcfs_klnl_start(LNL_TRANSPORT_CHANGELOG);
RETURN(rc);
struct client_obd *cli = &obd->u.cli;
libcfs_klnl_stop(LNL_TRANSPORT_HSM, LNL_GRP_HSM);
+ libcfs_klnl_stop(LNL_TRANSPORT_CHANGELOG, 0);
OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
return cucb.idx;
}
-/* non-seq version for direct calling by class_process_proc_param */
-static int mdd_changelog_write(struct file *file, const char *buffer,
- unsigned long count, void *data)
-{
- struct mdd_device *mdd = (struct mdd_device *)data;
- char kernbuf[32];
- char *end;
- int rc;
-
- if (count > (sizeof(kernbuf) - 1))
- goto out_usage;
-
- count = min_t(unsigned long, count, sizeof(kernbuf));
- if (copy_from_user(kernbuf, buffer, count))
- return -EFAULT;
-
- kernbuf[count] = '\0';
- /* strip trailing newline from "echo blah" */
- if (kernbuf[count - 1] == '\n')
- kernbuf[count - 1] = '\0';
-
- /* Forced on/off/purge rec, independent of changelog users! */
- if (strcmp(kernbuf, "on") == 0) {
- rc = mdd_changelog_on(mdd, 1);
- } else if (strcmp(kernbuf, "off") == 0) {
- rc = mdd_changelog_on(mdd, 0);
- } else {
- /* purge to an index */
- long long unsigned endrec;
-
- endrec = (long long)simple_strtoull(kernbuf, &end, 0);
- if (end == kernbuf)
- goto out_usage;
-
- LCONSOLE_INFO("changelog purge to %llu\n", endrec);
-
- rc = mdd_changelog_llog_cancel(mdd, endrec);
- }
-
- if (rc < 0)
- return rc;
- return count;
-
-out_usage:
- CWARN("changelog write usage: [on|off] | <purge_idx (0=all)>\n");
- return -EINVAL;
-}
-
-static ssize_t mdd_changelog_seq_write(struct file *file, const char *buffer,
- size_t count, loff_t *off)
-{
- struct seq_file *seq = file->private_data;
- struct changelog_seq_iter *csi = seq->private;
- struct mdd_device *mdd = (struct mdd_device *)csi->csi_dev;
-
- return mdd_changelog_write(file, buffer, count, mdd);
-}
-
-static int mdd_changelog_done(struct changelog_seq_iter *csi)
-{
- struct mdd_device *mdd = (struct mdd_device *)csi->csi_dev;
- int done = 0;
-
- spin_lock(&mdd->mdd_cl.mc_lock);
- done = (csi->csi_endrec >= mdd->mdd_cl.mc_index);
- spin_unlock(&mdd->mdd_cl.mc_lock);
- return done;
-}
-
-/* handle nonblocking */
-static ssize_t mdd_changelog_seq_read(struct file *file, char __user *buf,
- size_t count, loff_t *ppos)
-{
- struct seq_file *seq = (struct seq_file *)file->private_data;
- struct changelog_seq_iter *csi = seq->private;
- int rc;
- ENTRY;
-
- if ((file->f_flags & O_NONBLOCK) && mdd_changelog_done(csi))
- RETURN(-EAGAIN);
-
- csi->csi_done = 0;
- rc = seq_read(file, buf, count, ppos);
- RETURN(rc);
-}
-
-/* handle nonblocking */
-static unsigned int mdd_changelog_seq_poll(struct file *file, poll_table *wait)
-{
- struct seq_file *seq = (struct seq_file *)file->private_data;
- struct changelog_seq_iter *csi = seq->private;
- struct mdd_device *mdd = (struct mdd_device *)csi->csi_dev;
- ENTRY;
-
- csi->csi_done = 0;
- poll_wait(file, &mdd->mdd_cl.mc_waitq, wait);
- if (!mdd_changelog_done(csi))
- RETURN(POLLIN | POLLRDNORM);
-
- RETURN(0);
-}
-
-static int mdd_changelog_seq_open(struct inode *inode, struct file *file)
-{
- struct changelog_seq_iter *csi;
- struct obd_device *obd;
- int rc;
- ENTRY;
-
- rc = changelog_seq_open(inode, file, &csi);
- if (rc)
- RETURN(rc);
-
- /* The proc file is set up with mdd in data, not obd */
- obd = mdd2obd_dev((struct mdd_device *)csi->csi_dev);
- csi->csi_ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
- if (csi->csi_ctxt == NULL) {
- changelog_seq_release(inode, file);
- RETURN(-ENOENT);
- }
- /* The handle is set up in llog_obd_origin_setup */
- csi->csi_llh = csi->csi_ctxt->loc_handle;
- RETURN(rc);
-}
-
-static int mdd_changelog_seq_release(struct inode *inode, struct file *file)
-{
- struct seq_file *seq = file->private_data;
- struct changelog_seq_iter *csi = seq->private;
-
- if (csi && csi->csi_ctxt)
- llog_ctxt_put(csi->csi_ctxt);
-
- return (changelog_seq_release(inode, file));
-}
-
-/* mdd changelog proc can handle nonblocking ops and writing to purge recs */
-struct file_operations mdd_changelog_fops = {
- .owner = THIS_MODULE,
- .open = mdd_changelog_seq_open,
- .read = mdd_changelog_seq_read,
- .write = mdd_changelog_seq_write,
- .llseek = changelog_seq_lseek,
- .poll = mdd_changelog_seq_poll,
- .release = mdd_changelog_seq_release,
-};
-
#ifdef HAVE_QUOTA_SUPPORT
static int mdd_lprocfs_quota_rd_type(char *page, char **start, off_t off,
int count, int *eof, void *data)
{ "changelog_mask", lprocfs_rd_changelog_mask,
lprocfs_wr_changelog_mask, 0 },
{ "changelog_users", lprocfs_rd_changelog_users, 0, 0},
- { "changelog", 0, mdd_changelog_write, 0, &mdd_changelog_fops, 0600 },
#ifdef HAVE_QUOTA_SUPPORT
{ "quota_type", mdd_lprocfs_quota_rd_type,
mdd_lprocfs_quota_wr_type, 0 },
EXPORT_SYMBOL(lprocfs_obd_wr_recovery_maxtime);
-/**** Changelogs *****/
-#define D_CHANGELOG 0
-
-/* How many records per seq_show. Too small, we spawn llog_process threads
- too often; too large, we run out of buffer space */
-#define CHANGELOG_CHUNK_SIZE 100
-
-static int changelog_show_cb(struct llog_handle *llh, struct llog_rec_hdr *hdr,
- void *data)
-{
- struct seq_file *seq = (struct seq_file *)data;
- struct changelog_seq_iter *csi = seq->private;
- struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
- char *ptr;
- int cnt, rc;
- ENTRY;
-
- if ((rec->cr_hdr.lrh_type != CHANGELOG_REC) ||
- (rec->cr.cr_type >= CL_LAST)) {
- CERROR("Not a changelog rec %d/%d\n", rec->cr_hdr.lrh_type,
- rec->cr.cr_type);
- RETURN(-EINVAL);
- }
-
- CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64" cat=%d:%d start=%d:%d\n",
- rec->cr.cr_index, csi->csi_startrec,
- llh->lgh_hdr->llh_cat_idx, llh->lgh_cur_idx,
- csi->csi_startcat, csi->csi_startidx);
-
- if (rec->cr.cr_index < csi->csi_startrec)
- /* Skip entries earlier than what we are interested in */
- RETURN(0);
- if (rec->cr.cr_index == csi->csi_startrec) {
- /* Remember where we started, since seq_read will re-read
- * the data when it reallocs space. Sigh, if only there was
- * a way to tell seq_file how big the buf should be in the
- * first place...
- */
- csi->csi_startcat = llh->lgh_hdr->llh_cat_idx;
- csi->csi_startidx = rec->cr_hdr.lrh_index - 1;
- }
- if (csi->csi_wrote > CHANGELOG_CHUNK_SIZE) {
- /* Stop at some point with a reasonable seq_file buffer size.
- * Start from here the next time.
- */
- csi->csi_endrec = rec->cr.cr_index - 1;
- csi->csi_startcat = llh->lgh_hdr->llh_cat_idx;
- csi->csi_startidx = rec->cr_hdr.lrh_index - 1;
- csi->csi_wrote = 0;
- RETURN(LLOG_PROC_BREAK);
- }
-
- CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID
- " %.*s\n", rec->cr.cr_index, rec->cr.cr_type,
- changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
- rec->cr.cr_flags & CLF_FLAGMASK,
- PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
- rec->cr.cr_namelen, rec->cr.cr_name);
-
- cnt = sizeof(rec->cr) + rec->cr.cr_namelen;
- ptr = (char *)(&rec->cr);
- CDEBUG(D_CHANGELOG, "packed rec %d starting at %p\n", cnt, ptr);
- rc = 0;
- while ((cnt-- > 0) && (rc == 0)) {
- rc = seq_putc(seq, *ptr);
- ptr++;
- }
-
- if (rc < 0) {
- /* Ran out of room in the seq buffer. seq_read will dump
- * the whole buffer and re-seq_start with a larger one;
- * no point in continuing the llog_process */
- CDEBUG(D_CHANGELOG, "rec="LPU64" overflow "LPU64"<-"LPU64"\n",
- rec->cr.cr_index, csi->csi_startrec, csi->csi_endrec);
- csi->csi_endrec = csi->csi_startrec - 1;
- csi->csi_wrote = 0;
- RETURN(LLOG_PROC_BREAK);
- }
-
- csi->csi_wrote++;
- csi->csi_endrec = rec->cr.cr_index;
-
- RETURN(0);
-}
-
-static int changelog_seq_show(struct seq_file *seq, void *v)
-{
- struct changelog_seq_iter *csi = seq->private;
- int rc;
- ENTRY;
-
- if (csi->csi_fill) {
- /* seq_read wants more data to fill his buffer. But we already
- filled the buf as much as we cared to; force seq_read to
- accept that by padding with 0's */
- while (seq_putc(seq, 0) == 0);
- RETURN(0);
- }
-
- /* Since we have to restart the llog_cat_process for each chunk of the
- seq_ functions, start from where we left off. */
- rc = llog_cat_process(csi->csi_llh, changelog_show_cb, seq,
- csi->csi_startcat, csi->csi_startidx);
-
- CDEBUG(D_CHANGELOG,"seq_show "LPU64"-"LPU64" cat=%d:%d wrote=%d rc=%d\n",
- csi->csi_startrec, csi->csi_endrec, csi->csi_startcat,
- csi->csi_startidx, csi->csi_wrote, rc);
-
- if (rc == 0)
- csi->csi_done = 1;
- if (rc == LLOG_PROC_BREAK)
- /* more records left, but seq_show must return 0 */
- rc = 0;
- RETURN(rc);
-}
-
-static void *changelog_seq_start(struct seq_file *seq, loff_t *pos)
-{
- struct changelog_seq_iter *csi = seq->private;
- LASSERT(csi);
-
- CDEBUG(D_CHANGELOG, "start "LPU64"-"LPU64" pos="LPU64"\n",
- csi->csi_startrec, csi->csi_endrec, *pos);
-
- csi->csi_fill = 0;
-
- if (csi->csi_done)
- /* no more records, seq_read should return 0 if buffer
- is empty */
- return NULL;
-
- if (*pos > csi->csi_pos) {
- /* The seq_read implementation sucks. It may call start
- multiple times, using pos to indicate advances, if any,
- by arbitrarily increasing it by 1. So ignore the actual
- value of pos, and just register any increase as
- "seq_read wants the next values". */
- csi->csi_startrec = csi->csi_endrec + 1;
- csi->csi_pos = *pos;
- }
- /* else use old startrec/startidx */
-
- return csi;
-}
-
-static void changelog_seq_stop(struct seq_file *seq, void *v)
-{
- struct changelog_seq_iter *csi = seq->private;
-
- CDEBUG(D_CHANGELOG, "stop "LPU64"-"LPU64"\n",
- csi->csi_startrec, csi->csi_endrec);
-}
-
-static void *changelog_seq_next(struct seq_file *seq, void *v, loff_t *pos)
-{
- struct changelog_seq_iter *csi = seq->private;
-
- CDEBUG(D_CHANGELOG, "next "LPU64"-"LPU64" pos="LPU64"\n",
- csi->csi_startrec, csi->csi_endrec, *pos);
-
- csi->csi_fill = 1;
-
- return csi;
-}
-
-static struct seq_operations changelog_sops = {
- .start = changelog_seq_start,
- .stop = changelog_seq_stop,
- .next = changelog_seq_next,
- .show = changelog_seq_show,
-};
-
-int changelog_seq_open(struct inode *inode, struct file *file,
- struct changelog_seq_iter **csih)
-{
- struct changelog_seq_iter *csi;
- struct proc_dir_entry *dp = PDE(inode);
- struct seq_file *seq;
- int rc;
-
- LPROCFS_ENTRY_AND_CHECK(dp);
-
- rc = seq_open(file, &changelog_sops);
- if (rc) {
- LPROCFS_EXIT();
- return rc;
- }
-
- OBD_ALLOC_PTR(csi);
- if (csi == NULL) {
- lprocfs_seq_release(inode, file);
- return -ENOMEM;
- }
-
- csi->csi_dev = dp->data;
- seq = file->private_data;
- seq->private = csi;
- *csih = csi;
-
- return rc;
-}
-EXPORT_SYMBOL(changelog_seq_open);
-
-int changelog_seq_release(struct inode *inode, struct file *file)
-{
- struct seq_file *seq = file->private_data;
- struct changelog_seq_iter *csi = seq->private;
-
- if (csi)
- OBD_FREE_PTR(csi);
-
- return lprocfs_seq_release(inode, file);
-}
-EXPORT_SYMBOL(changelog_seq_release);
-
-#ifndef SEEK_CUR /* SLES10 needs this */
-#define SEEK_CUR 1
-#define SEEK_END 2
-#endif
-
-loff_t changelog_seq_lseek(struct file *file, loff_t offset, int origin)
-{
- struct seq_file *seq = (struct seq_file *)file->private_data;
- struct changelog_seq_iter *csi = seq->private;
-
- CDEBUG(D_CHANGELOG,"seek "LPU64"-"LPU64" off="LPU64":%d fpos="LPU64"\n",
- csi->csi_startrec, csi->csi_endrec, offset, origin, file->f_pos);
-
- LL_SEQ_LOCK(seq);
-
- switch (origin) {
- case SEEK_CUR:
- offset += csi->csi_endrec;
- break;
- case SEEK_END:
- /* we don't know the last rec */
- offset = -1;
- }
-
- /* SEEK_SET */
-
- if (offset < 0) {
- LL_SEQ_UNLOCK(seq);
- return -EINVAL;
- }
-
- csi->csi_startrec = offset;
- csi->csi_endrec = offset ? offset - 1 : 0;
-
- /* drop whatever is left in sucky seq_read's buffer */
- seq->count = 0;
- seq->from = 0;
- seq->index++;
- LL_SEQ_UNLOCK(seq);
- file->f_pos = csi->csi_startrec;
- return csi->csi_startrec;
-}
-EXPORT_SYMBOL(changelog_seq_lseek);
-
EXPORT_SYMBOL(lprocfs_register);
EXPORT_SYMBOL(lprocfs_srch);
EXPORT_SYMBOL(lprocfs_remove);
fini_changelog
cleanup_src_tgt
return $RC
-
}
run_test 1 "Simple Replication"
createmany -o $DIR/$tdir/$tfile $NUMFILES
# To simulate replication to another lustre filesystem, replicate
- # the changes to $DIR/tgt. Disable changelogs before replication
- # so that the files created as part of replication are not logged.
- do_facet $SINGLEMDS lctl set_param -n mdd.$MDT0.changelog off
+ # the changes to $DIR/tgt. We can't turn off the changelogs
+ # while we are registered, so lreplicate better not try to
+ # replicate the replication steps. It seems ok :)
mkdir $DIR/tgt
$LREPLICATE -s $DIR -t $DIR/tgt -m $MDT0 -u $CL_USER -l $LREPL_LOG
"usage: ls [OPTION]... [FILE]..."},
{"changelog", lfs_changelog, 0,
"Show the metadata changes on an MDT."
- "\nusage: changelog [--follow] <mdtname> [startrec [endrec]]"
- "\n(note: --follow is only valid when run on MDT node)"},
+ "\nusage: changelog <mdtname> [startrec [endrec]]"},
{"changelog_clear", lfs_changelog_clear, 0,
"Indicate that old changelog records up to <endrec> are no longer of "
"interest to consumer <id>, allowing the system to free up space.\n"
rec->cr_namelen, rec->cr_name);
else
printf("\n");
+
llapi_changelog_free(&rec);
}
llapi_changelog_fini(&changelog_priv);
+ if (rc < 0)
+ fprintf(stderr, "Changelog: %s\n", strerror(errno = -rc));
+
return (rc == 1 ? 0 : rc);
}
#define CHANGELOG_PRIV_MAGIC 0xCA8E1080
struct changelog_private {
int magic;
- int fd;
int flags;
+ lustre_netlink lnl;
};
/** Start reading from a changelog
* @param priv Opaque private control structure
- * @param flags Start flags (e.g. follow)
+ * @param flags Start flags (e.g. CHANGELOG_FLAG_BLOCK)
* @param device Report changes recorded on this MDT
* @param startrec Report changes beginning with this record number
+ * (just call llapi_changelog_fini when done; don't need an endrec)
*/
int llapi_changelog_start(void **priv, int flags, const char *device,
long long startrec)
{
struct changelog_private *cp;
- char path[256];
+ struct changelog_show cs = {};
char mdtname[20];
- int rc, fd;
+ char pattern[100];
+ char trigger[100];
+ int fd, rc, pid;
- if (device[0] == '/')
- rc = llapi_search_fsname(device, mdtname);
- else
- strncpy(mdtname, device, sizeof(mdtname));
+ /* Find mdtname from path, fsname, mdtname, or mdtname_UUID */
+ if (device[0] == '/') {
+ if ((rc = llapi_search_fsname(device, mdtname)))
+ return rc;
+ if ((rc = get_mdtname(mdtname, "%s%s", mdtname)) < 0)
+ return rc;
+ } else {
+ if ((rc = get_mdtname((char *)device, "%s%s", mdtname)) < 0)
+ return rc;
+ }
- /* Use either the mdd changelog (preferred) or a client mdc changelog */
- if (get_mdtname(mdtname,
- "/proc/fs/lustre/md[cd]/%s%s{,-mdc-*}/changelog",
- path) < 0)
- return -EINVAL;
- rc = first_match(path, path);
+ /* Find corresponding mdc trigger */
+ snprintf(pattern, PATH_MAX,
+ "/proc/fs/lustre/mdc/%s-*/changelog_trigger", mdtname);
+ rc = first_match(pattern, trigger);
if (rc)
return rc;
- if ((fd = open(path, O_RDONLY)) < 0) {
- llapi_err(LLAPI_MSG_ERROR, "error: can't open |%s|\n", path);
- return -errno;
- }
-
- rc = lseek(fd, (off_t)startrec, SEEK_SET);
- if (rc < 0) {
- llapi_err(LLAPI_MSG_ERROR, "can't seek rc=%d\n", rc);
+ /* Make sure we can write the trigger */
+ fd = open(trigger, O_WRONLY);
+ if (fd < 0)
return -errno;
- }
+ /* Set up the receiver control struct */
cp = malloc(sizeof(*cp));
if (cp == NULL) {
close(fd);
}
cp->magic = CHANGELOG_PRIV_MAGIC;
- cp->fd = fd;
cp->flags = flags;
- *priv = cp;
+ /* Start the receiver */
+ rc = libcfs_ulnl_start(&cp->lnl, 0 /* unicast */);
+ if (rc < 0)
+ goto out_free;
+
+ /* We need to trigger Lustre to start sending messages now.
+ We could send a lnl message to a kernel listener,
+ or write into proc. Proc has the advantage of running in this
+ context, avoiding the need for a kernel thread. */
+ cs.cs_pid = getpid();
+ cs.cs_startrec = startrec;
+ cs.cs_flags = flags & CHANGELOG_FLAG_BLOCK ? LNL_FL_BLOCK : 0;
+ if ((pid = fork()) < 0) {
+ goto out_free;
+ } else if (!pid) {
+ /* Write triggers Lustre to start sending, but it
+ won't return until it is complete, meaning everything
+ got shipped through lnl (or error). So we trigger it
+ from a child process here, allowing the llapi call to
+ return and wait for the lnl messages. */
+ rc = write(fd, &cs, sizeof(cs));
+ exit(rc);
+ }
+ close(fd);
+ *priv = cp;
return 0;
+
+out_free:
+ free(cp);
+ close(fd);
+ return rc;
}
/** Finish reading from a changelog */
if (!cp || (cp->magic != CHANGELOG_PRIV_MAGIC))
return -EINVAL;
- close(cp->fd);
+ libcfs_ulnl_stop(&cp->lnl);
free(cp);
*priv = NULL;
return 0;
}
-static int pollwait(int fd) {
- struct pollfd pfds[1];
- int rc;
-
- pfds[0].fd = fd;
- pfds[0].events = POLLIN;
- rc = poll(pfds, 1, -1);
- return rc < 0 ? -errno : rc;
-}
-
/** Read the next changelog entry
* @param priv Opaque private control structure
* @param rech Changelog record handle; record will be allocated here
int llapi_changelog_recv(void *priv, struct changelog_rec **rech)
{
struct changelog_private *cp = (struct changelog_private *)priv;
- struct changelog_rec rec, *recp;
+ struct lnl_hdr *lnlh;
int rc = 0;
if (!cp || (cp->magic != CHANGELOG_PRIV_MAGIC))
if (rech == NULL)
return -EINVAL;
-readrec:
- /* Read in the rec to get the namelen */
- rc = read(cp->fd, &rec, sizeof(rec));
+repeat:
+ rc = libcfs_ulnl_msg_get(&cp->lnl, CR_MAXSIZE, LNL_TRANSPORT_CHANGELOG,
+ &lnlh);
if (rc < 0)
- return -errno;
- if (rc == 0) {
- if (cp->flags && CHANGELOG_FLAG_FOLLOW) {
- rc = pollwait(cp->fd);
- if (rc < 0)
- return rc;
- goto readrec;
- }
- return 1;
+ return rc;
+
+ if ((lnlh->lnl_transport != LNL_TRANSPORT_CHANGELOG) ||
+ ((lnlh->lnl_msgtype != CL_RECORD) &&
+ (lnlh->lnl_msgtype != CL_EOF))) {
+ llapi_err(LLAPI_MSG_ERROR | LLAPI_MSG_NO_ERRNO,
+ "Unknown changelog message type %d:%d\n",
+ lnlh->lnl_transport, lnlh->lnl_msgtype);
+ rc = -EPROTO;
+ goto out_free;
}
- recp = malloc(sizeof(rec) + rec.cr_namelen);
- if (recp == NULL)
- return -ENOMEM;
- memcpy(recp, &rec, sizeof(rec));
- rc = read(cp->fd, recp->cr_name, rec.cr_namelen);
- if (rc < 0) {
- free(recp);
- llapi_err(LLAPI_MSG_ERROR, "Can't read entire filename");
- return -errno;
+ if (lnlh->lnl_msgtype == CL_EOF) {
+ if (cp->flags & CHANGELOG_FLAG_FOLLOW) {
+ /* Ignore EOFs */
+ goto repeat;
+ } else {
+ rc = 1;
+ goto out_free;
+ }
}
- *rech = recp;
+ /* Our message is a changelog_rec */
+ *rech = (struct changelog_rec *)(lnlh + 1);
+
return 0;
+
+out_free:
+ libcfs_ulnl_msg_free(&lnlh);
+ *rech = NULL;
+ return rc;
}
/** Release the changelog record when done with it. */
int llapi_changelog_free(struct changelog_rec **rech)
{
- if (*rech)
- free(*rech);
+ if (*rech) {
+ struct lnl_hdr *lnlh = (struct lnl_hdr *)*rech - 1;
+ libcfs_ulnl_msg_free(&lnlh);
+ }
*rech = NULL;
return 0;
}
strncpy(info->name, rec->cr_name, rec->cr_namelen);
info->name[rec->cr_namelen] = '\0';
+ if (verbose > 1)
+ printf("Rec %lld: %d %s\n", info->recno, info->type,info->name);
+
llapi_changelog_free(&rec);
rec_count++;
lr_print_status(info);
/* Open changelogs for consumption*/
- rc = llapi_changelog_start(&changelog_priv, 0, status->ls_source_fs,
- status->ls_last_recno);
+ rc = llapi_changelog_start(&changelog_priv, CHANGELOG_FLAG_BLOCK,
+ status->ls_source_fs, status->ls_last_recno);
if (rc < 0) {
fprintf(stderr, "Error opening changelog file for fs %s.\n",
status->ls_source_fs);