Whamcloud - gitweb
LU-7659 mdc: expose changelog through char devices 00/18900/15
authorHenri Doreau <henri.doreau@cea.fr>
Wed, 13 Jan 2016 14:53:00 +0000 (15:53 +0100)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 6 Apr 2017 13:46:25 +0000 (13:46 +0000)
Register one character device per MDT in order to allow non-llapi to
read them and to make delivery more efficient.

- open() spawns a thread to prefetch records and enqueue them into a
  local buffer (unless the device is open in write-only mode).
- lseek() can be used to jump to a specific record, in which case the
  offset is a record number (with SEEK_SET) or a number of records to
  skip (SEEK_CUR). Movement can only be done forward.
- read() copies records to userland. No truncation happens, so short
  reads are likely.
- write() is used to transmit control commands to the device.
  The only available one is changelog_clear, which is done by writing
  "clear:cl<user>:<recno>" into the device.
- close() terminates the prefetch thread if any, and releases resources.

It is possible to poll() on the device to get notified when new records
are available for read.

Signed-off-by: Henri Doreau <henri.doreau@cea.fr>
Change-Id: I14709fdbac76b5512e58099e4e536cf9c973868c
Reviewed-on: https://review.whamcloud.com/18900
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
15 files changed:
lustre/include/lustre/lustre_user.h
lustre/include/lustre/lustreapi.h
lustre/include/lustre_ioctl.h
lustre/include/obd.h
lustre/include/uapi_kernelcomm.h
lustre/ldlm/ldlm_lib.c
lustre/llite/dir.c
lustre/lmv/lmv_obd.c
lustre/mdc/Makefile.in
lustre/mdc/mdc_changelog.c [new file with mode: 0644]
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_request.c
lustre/utils/Makefile.am
lustre/utils/liblustreapi.c
lustre/utils/liblustreapi_chlg.c [new file with mode: 0644]

index 6b33e95..52f59ad 100644 (file)
@@ -992,13 +992,6 @@ static inline void changelog_remap_rec(struct changelog_rec *rec,
        rec->cr_flags = (rec->cr_flags & CLF_FLAGMASK) | crf_wanted;
 }
 
-struct ioc_changelog {
-        __u64 icc_recno;
-        __u32 icc_mdtindex;
-        __u32 icc_id;
-        __u32 icc_flags;
-};
-
 enum changelog_message_type {
         CL_RECORD = 10, /* message is a changelog_rec */
         CL_EOF    = 11, /* at end of current changelog */
index 6c0a17f..07484ca 100644 (file)
@@ -347,6 +347,7 @@ extern int llapi_changelog_start(void **priv, enum changelog_send_flag flags,
 extern int llapi_changelog_fini(void **priv);
 extern int llapi_changelog_recv(void *priv, struct changelog_rec **rech);
 extern int llapi_changelog_free(struct changelog_rec **rech);
+extern int llapi_changelog_get_fd(void *priv);
 /* Allow records up to endrec to be destroyed; requires registered id. */
 extern int llapi_changelog_clear(const char *mdtname, const char *idstr,
                                  long long endrec);
index fbd0f03..fa63359 100644 (file)
@@ -373,7 +373,7 @@ obd_ioctl_unpack(struct obd_ioctl_data *data, char *pbuf, int max_len)
 #define OBD_GET_VERSION                _IOWR('f', 144, OBD_IOC_DATA_TYPE)
 /*     OBD_IOC_GSS_SUPPORT     _IOWR('f', 145, OBD_IOC_DATA_TYPE) */
 /*     OBD_IOC_CLOSE_UUID      _IOWR('f', 147, OBD_IOC_DATA_TYPE) */
-#define OBD_IOC_CHANGELOG_SEND _IOW ('f', 148, OBD_IOC_DATA_TYPE)
+/*     OBD_IOC_CHANGELOG_SEND  _IOW ('f', 148, OBD_IOC_DATA_TYPE) */
 #define OBD_IOC_GETDEVICE      _IOWR('f', 149, OBD_IOC_DATA_TYPE)
 #define OBD_IOC_FID2PATH       _IOWR('f', 150, OBD_IOC_DATA_TYPE)
 /*     lustre/lustre_user.h    151-153 */
index b399149..455e232 100644 (file)
@@ -325,13 +325,15 @@ struct client_obd {
        struct lu_client_seq    *cl_seq;
        struct rw_semaphore      cl_seq_rwsem;
 
-       atomic_t             cl_resends; /* resend count */
+       atomic_t                 cl_resends; /* resend count */
 
        /* ptlrpc work for writeback in ptlrpcd context */
        void                    *cl_writeback_work;
        void                    *cl_lru_work;
        /* hash tables for osc_quota_info */
        struct cfs_hash         *cl_quota_hash[LL_MAXQUOTAS];
+       /* Links to the global list of registered changelog devices */
+       struct list_head         cl_chg_dev_linkage;
 };
 #define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
 
index 208002c..e8119f5 100644 (file)
@@ -51,7 +51,6 @@ struct kuc_hdr {
        __u16 kuc_msglen;     /* Including header */
 } __attribute__((aligned(sizeof(__u64))));
 
-#define KUC_CHANGELOG_MSG_MAXSIZE (sizeof(struct kuc_hdr)+CR_MAXSIZE)
 
 #define KUC_MAGIC  0x191C /*Lustre9etLinC */
 
@@ -59,7 +58,6 @@ struct kuc_hdr {
 enum kuc_transport_type {
        KUC_TRANSPORT_GENERIC   = 1,
        KUC_TRANSPORT_HSM       = 2,
-       KUC_TRANSPORT_CHANGELOG = 3,
 };
 
 enum kuc_generic_message_type {
index ced1198..268ae92 100644 (file)
@@ -436,6 +436,8 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
        init_waitqueue_head(&cli->cl_mod_rpcs_waitq);
        cli->cl_mod_tag_bitmap = NULL;
 
+       INIT_LIST_HEAD(&cli->cl_chg_dev_linkage);
+
        if (connect_op == MDS_CONNECT) {
                cli->cl_max_mod_rpcs_in_flight = cli->cl_max_rpcs_in_flight - 1;
                OBD_ALLOC(cli->cl_mod_tag_bitmap,
index b171f3e..afa1020 100644 (file)
@@ -1546,14 +1546,6 @@ out_rmdir:
                RETURN(obd_iocontrol(cmd, sbi->ll_md_exp, 0, NULL,
                                     (void __user *)arg));
         }
-        case OBD_IOC_CHANGELOG_SEND:
-        case OBD_IOC_CHANGELOG_CLEAR:
-               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                       RETURN(-EPERM);
-
-               rc = copy_and_ioctl(cmd, sbi->ll_md_exp, (void __user *)arg,
-                                    sizeof(struct ioc_changelog));
-                RETURN(rc);
        case OBD_IOC_FID2PATH:
                RETURN(ll_fid2path(inode, (void __user *)arg));
        case LL_IOC_GETPARENT:
index 74166fb..5be99ee 100644 (file)
@@ -985,19 +985,6 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                 OBD_FREE_PTR(oqctl);
                 break;
         }
-        case OBD_IOC_CHANGELOG_SEND:
-        case OBD_IOC_CHANGELOG_CLEAR: {
-                struct ioc_changelog *icc = karg;
-
-                if (icc->icc_mdtindex >= count)
-                        RETURN(-ENODEV);
-
-               tgt = lmv->tgts[icc->icc_mdtindex];
-               if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
-                       RETURN(-ENODEV);
-               rc = obd_iocontrol(cmd, tgt->ltd_exp, sizeof(*icc), icc, NULL);
-               break;
-       }
        case LL_IOC_GET_CONNECT_FLAGS: {
                tgt = lmv->tgts[0];
                if (tgt == NULL || tgt->ltd_exp == NULL)
index f007298..d29d31e 100644 (file)
@@ -1,5 +1,10 @@
 MODULES := mdc
-mdc-objs := mdc_request.o mdc_reint.o lproc_mdc.o mdc_lib.o mdc_locks.o
+mdc-objs :=    mdc_request.o \
+               mdc_reint.o \
+               lproc_mdc.o \
+               mdc_lib.o \
+               mdc_locks.o \
+               mdc_changelog.o
 
 EXTRA_DIST = $(mdc-objs:.o=.c) mdc_internal.h
 
diff --git a/lustre/mdc/mdc_changelog.c b/lustre/mdc/mdc_changelog.c
new file mode 100644 (file)
index 0000000..3ae27f9
--- /dev/null
@@ -0,0 +1,734 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2017, Commissariat a l'Energie Atomique et aux Energies
+ *                     Alternatives.
+ *
+ * Author: Henri Doreau <henri.doreau@cea.fr>
+ */
+
+#define DEBUG_SUBSYSTEM S_MDC
+
+#include <linux/init.h>
+#include <linux/kthread.h>
+#include <linux/poll.h>
+#include <linux/miscdevice.h>
+
+#include <lustre_log.h>
+
+#include "mdc_internal.h"
+
+
+/*
+ * -- Changelog delivery through character device --
+ */
+
+/**
+ * Mutex to protect chlg_registered_devices below
+ */
+static DEFINE_MUTEX(chlg_registered_dev_lock);
+
+/**
+ * Global linked list of all registered devices (one per MDT).
+ */
+static LIST_HEAD(chlg_registered_devices);
+
+
+struct chlg_registered_dev {
+       /* Device name of the form "changelog-{MDTNAME}" */
+       char                    ced_name[32];
+       /* Misc device descriptor */
+       struct miscdevice       ced_misc;
+       /* OBDs referencing this device (multiple mount point) */
+       struct list_head        ced_obds;
+       /* Reference counter for proper deregistration */
+       struct kref             ced_refs;
+       /* Link within the global chlg_registered_devices */
+       struct list_head        ced_link;
+};
+
+struct chlg_reader_state {
+       /* Shortcut to the corresponding OBD device */
+       struct obd_device       *crs_obd;
+       /* An error occurred that prevents from reading further */
+       bool                     crs_err;
+       /* EOF, no more records available */
+       bool                     crs_eof;
+       /* Userland reader closed connection */
+       bool                     crs_closed;
+       /* Desired start position */
+       __u64                    crs_start_offset;
+       /* Wait queue for the catalog processing thread */
+       wait_queue_head_t        crs_waitq_prod;
+       /* Wait queue for the record copy threads */
+       wait_queue_head_t        crs_waitq_cons;
+       /* Mutex protecting crs_rec_count and crs_rec_queue */
+       struct mutex             crs_lock;
+       /* Number of item in the list */
+       __u64                    crs_rec_count;
+       /* List of prefetched enqueued_record::enq_linkage_items */
+       struct list_head         crs_rec_queue;
+};
+
+struct chlg_rec_entry {
+       /* Link within the chlg_reader_state::crs_rec_queue list */
+       struct list_head        enq_linkage;
+       /* Data (enq_record) field length */
+       __u64                   enq_length;
+       /* Copy of a changelog record (see struct llog_changelog_rec) */
+       struct changelog_rec    enq_record[];
+};
+
+enum {
+       /* Number of records to prefetch locally. */
+       CDEV_CHLG_MAX_PREFETCH = 1024,
+};
+
+/**
+ * ChangeLog catalog processing callback invoked on each record.
+ * If the current record is eligible to userland delivery, push
+ * it into the crs_rec_queue where the consumer code will fetch it.
+ *
+ * @param[in]     env  (unused)
+ * @param[in]     llh  Client-side handle used to identify the llog
+ * @param[in]     hdr  Header of the current llog record
+ * @param[in,out] data chlg_reader_state passed from caller
+ *
+ * @return 0 or LLOG_PROC_* control code on success, negated error on failure.
+ */
+static int chlg_read_cat_process_cb(const struct lu_env *env,
+                                   struct llog_handle *llh,
+                                   struct llog_rec_hdr *hdr, void *data)
+{
+       struct llog_changelog_rec *rec;
+       struct chlg_reader_state *crs = data;
+       struct chlg_rec_entry *enq;
+       struct l_wait_info lwi = { 0 };
+       size_t len;
+       int rc;
+       ENTRY;
+
+       LASSERT(crs != NULL);
+       LASSERT(hdr != NULL);
+
+       rec = container_of(hdr, struct llog_changelog_rec, cr_hdr);
+
+       if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
+               rc = -EINVAL;
+               CERROR("%s: not a changelog rec %x/%d in llog "DFID" rc = %d\n",
+                      crs->crs_obd->obd_name, rec->cr_hdr.lrh_type,
+                      rec->cr.cr_type,
+                      PFID(lu_object_fid(&llh->lgh_obj->do_lu)), rc);
+               RETURN(rc);
+       }
+
+       /* Skip undesired records */
+       if (rec->cr.cr_index < crs->crs_start_offset)
+               RETURN(0);
+
+       CDEBUG(D_HSM, "%llu %02d%-5s %llu 0x%x t="DFID" p="DFID" %.*s\n",
+              rec->cr.cr_index, rec->cr.cr_type,
+              changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
+              rec->cr.cr_flags & CLF_FLAGMASK,
+              PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
+              rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
+
+       l_wait_event(crs->crs_waitq_prod,
+                    (crs->crs_rec_count < CDEV_CHLG_MAX_PREFETCH ||
+                     crs->crs_closed), &lwi);
+
+       if (crs->crs_closed)
+               RETURN(LLOG_PROC_BREAK);
+
+       len = changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
+       OBD_ALLOC(enq, sizeof(*enq) + len);
+       if (enq == NULL)
+               RETURN(-ENOMEM);
+
+       INIT_LIST_HEAD(&enq->enq_linkage);
+       enq->enq_length = len;
+       memcpy(enq->enq_record, &rec->cr, len);
+
+       mutex_lock(&crs->crs_lock);
+       list_add_tail(&enq->enq_linkage, &crs->crs_rec_queue);
+       crs->crs_rec_count++;
+       mutex_unlock(&crs->crs_lock);
+
+       wake_up_all(&crs->crs_waitq_cons);
+
+       RETURN(0);
+}
+
+/**
+ * Remove record from the list it is attached to and free it.
+ */
+static void enq_record_delete(struct chlg_rec_entry *rec)
+{
+       list_del(&rec->enq_linkage);
+       OBD_FREE(rec, sizeof(*rec) + rec->enq_length);
+}
+
+/**
+ * Release resources associated to a changelog_reader_state instance.
+ *
+ * @param  crs  CRS instance to release.
+ */
+static void crs_free(struct chlg_reader_state *crs)
+{
+       struct chlg_rec_entry *rec;
+       struct chlg_rec_entry *tmp;
+
+       list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage)
+               enq_record_delete(rec);
+
+       OBD_FREE_PTR(crs);
+}
+
+/**
+ * Record prefetch thread entry point. Opens the changelog catalog and starts
+ * reading records.
+ *
+ * @param[in,out]  args  chlg_reader_state passed from caller.
+ * @return 0 on success, negated error code on failure.
+ */
+static int chlg_load(void *args)
+{
+       struct chlg_reader_state *crs = args;
+       struct obd_device *obd = crs->crs_obd;
+       struct llog_ctxt *ctx = NULL;
+       struct llog_handle *llh = NULL;
+       struct l_wait_info lwi = { 0 };
+       int rc;
+       ENTRY;
+
+       ctx = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
+       if (ctx == NULL)
+               GOTO(err_out, rc = -ENOENT);
+
+       rc = llog_open(NULL, ctx, &llh, NULL, CHANGELOG_CATALOG,
+                      LLOG_OPEN_EXISTS);
+       if (rc) {
+               CERROR("%s: fail to open changelog catalog: rc = %d\n",
+                      obd->obd_name, rc);
+               GOTO(err_out, rc);
+       }
+
+       rc = llog_init_handle(NULL, llh, LLOG_F_IS_CAT|LLOG_F_EXT_JOBID, NULL);
+       if (rc) {
+               CERROR("%s: fail to init llog handle: rc = %d\n",
+                      obd->obd_name, rc);
+               GOTO(err_out, rc);
+       }
+
+       rc = llog_cat_process(NULL, llh, chlg_read_cat_process_cb, crs, 0, 0);
+       if (rc < 0) {
+               CERROR("%s: fail to process llog: rc = %d\n", obd->obd_name, rc);
+               GOTO(err_out, rc);
+       }
+
+err_out:
+       crs->crs_err = true;
+       wake_up_all(&crs->crs_waitq_cons);
+
+       if (llh != NULL)
+               llog_cat_close(NULL, llh);
+
+       if (ctx != NULL)
+               llog_ctxt_put(ctx);
+
+       l_wait_event(crs->crs_waitq_prod, crs->crs_closed, &lwi);
+       crs_free(crs);
+       RETURN(rc);
+}
+
+/**
+ * Read handler, dequeues records from the chlg_reader_state if any.
+ * No partial records are copied to userland so this function can return less
+ * data than required (short read).
+ *
+ * @param[in]   file   File pointer to the character device.
+ * @param[out]  buff   Userland buffer where to copy the records.
+ * @param[in]   count  Userland buffer size.
+ * @param[out]  ppos   File position, updated with the index number of the next
+ *                    record to read.
+ * @return number of copied bytes on success, negated error code on failure.
+ */
+static ssize_t chlg_read(struct file *file, char __user *buff, size_t count,
+                        loff_t *ppos)
+{
+       struct chlg_reader_state *crs = file->private_data;
+       struct chlg_rec_entry *rec;
+       struct chlg_rec_entry *tmp;
+       struct l_wait_info lwi = { 0 };
+       ssize_t  written_total = 0;
+       LIST_HEAD(consumed);
+       ENTRY;
+
+       if (file->f_flags & O_NONBLOCK && crs->crs_rec_count == 0)
+               RETURN(-EAGAIN);
+
+       l_wait_event(crs->crs_waitq_cons,
+                    crs->crs_rec_count > 0 || crs->crs_eof || crs->crs_err,
+                    &lwi);
+
+       mutex_lock(&crs->crs_lock);
+       list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) {
+               if (written_total + rec->enq_length > count)
+                       break;
+
+               if (copy_to_user(buff, rec->enq_record, rec->enq_length)) {
+                       if (written_total == 0)
+                               written_total = -EFAULT;
+                       break;
+               }
+
+               buff += rec->enq_length;
+               written_total += rec->enq_length;
+
+               crs->crs_rec_count--;
+               list_move_tail(&rec->enq_linkage, &consumed);
+
+               crs->crs_start_offset = rec->enq_record->cr_index + 1;
+       }
+       mutex_unlock(&crs->crs_lock);
+
+       if (written_total > 0)
+               wake_up_all(&crs->crs_waitq_prod);
+
+       list_for_each_entry_safe(rec, tmp, &consumed, enq_linkage)
+               enq_record_delete(rec);
+
+       *ppos = crs->crs_start_offset;
+
+       RETURN(written_total);
+}
+
+/**
+ * Jump to a given record index. Helper for chlg_llseek().
+ *
+ * @param[in,out]  crs     Internal reader state.
+ * @param[in]      offset  Desired offset (index record).
+ * @return 0 on success, negated error code on failure.
+ */
+static int chlg_set_start_offset(struct chlg_reader_state *crs, __u64 offset)
+{
+       struct chlg_rec_entry *rec;
+       struct chlg_rec_entry *tmp;
+
+       mutex_lock(&crs->crs_lock);
+       if (offset < crs->crs_start_offset) {
+               mutex_unlock(&crs->crs_lock);
+               return -ERANGE;
+       }
+
+       crs->crs_start_offset = offset;
+       list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) {
+               struct changelog_rec *cr = rec->enq_record;
+
+               if (cr->cr_index >= crs->crs_start_offset)
+                       break;
+
+               crs->crs_rec_count--;
+               enq_record_delete(rec);
+       }
+
+       mutex_unlock(&crs->crs_lock);
+       wake_up_all(&crs->crs_waitq_prod);
+       return 0;
+}
+
+/**
+ * Move read pointer to a certain record index, encoded as an offset.
+ *
+ * @param[in,out] file   File pointer to the changelog character device
+ * @param[in]    off    Offset to skip, actually a record index, not byte count
+ * @param[in]    whence Relative/Absolute interpretation of the offset
+ * @return the resulting position on success or negated error code on failure.
+ */
+static loff_t chlg_llseek(struct file *file, loff_t off, int whence)
+{
+       struct chlg_reader_state *crs = file->private_data;
+       loff_t pos;
+       int rc;
+
+       switch (whence) {
+       case SEEK_SET:
+               pos = off;
+               break;
+       case SEEK_CUR:
+               pos = file->f_pos + off;
+               break;
+       case SEEK_END:
+       default:
+               return -EINVAL;
+       }
+
+       /* We cannot go backward */
+       if (pos < file->f_pos)
+               return -EINVAL;
+
+       rc = chlg_set_start_offset(crs, pos);
+       if (rc != 0)
+               return rc;
+
+       file->f_pos = pos;
+       return pos;
+}
+
+/**
+ * Clear record range for a given changelog reader.
+ *
+ * @param[in]  crs     Current internal state.
+ * @param[in]  reader  Changelog reader ID (cl1, cl2...)
+ * @param[in]  record  Record index up which to clear
+ * @return 0 on success, negated error code on failure.
+ */
+static int chlg_clear(struct chlg_reader_state *crs, __u32 reader, __u64 record)
+{
+       struct obd_device *obd = crs->crs_obd;
+       struct changelog_setinfo cs  = {
+               .cs_recno = record,
+               .cs_id    = reader
+       };
+
+       return obd_set_info_async(NULL, obd->obd_self_export,
+                                 strlen(KEY_CHANGELOG_CLEAR),
+                                 KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, NULL);
+}
+
+/** Maximum changelog control command size */
+#define CHLG_CONTROL_CMD_MAX   64
+
+/**
+ * Handle writes() into the changelog character device. Write() can be used
+ * to request special control operations.
+ *
+ * @param[in]  file  File pointer to the changelog character device
+ * @param[in]  buff  User supplied data (written data)
+ * @param[in]  count Number of written bytes
+ * @param[in]  off   (unused)
+ * @return number of written bytes on success, negated error code on failure.
+ */
+static ssize_t chlg_write(struct file *file, const char __user *buff,
+                         size_t count, loff_t *off)
+{
+       struct chlg_reader_state *crs = file->private_data;
+       char *kbuf;
+       __u64 record;
+       __u32 reader;
+       int rc = 0;
+       ENTRY;
+
+       if (count > CHLG_CONTROL_CMD_MAX)
+               RETURN(-EINVAL);
+
+       OBD_ALLOC(kbuf, CHLG_CONTROL_CMD_MAX);
+       if (kbuf == NULL)
+               RETURN(-ENOMEM);
+
+       if (copy_from_user(kbuf, buff, count))
+               GOTO(out_kbuf, rc = -EFAULT);
+
+       kbuf[CHLG_CONTROL_CMD_MAX - 1] = '\0';
+
+       if (sscanf(kbuf, "clear:cl%u:%llu", &reader, &record) == 2)
+               rc = chlg_clear(crs, reader, record);
+       else
+               rc = -EINVAL;
+
+       EXIT;
+out_kbuf:
+       OBD_FREE(kbuf, CHLG_CONTROL_CMD_MAX);
+       return rc < 0 ? rc : count;
+}
+
+/**
+ * Find the OBD device associated to a changelog character device.
+ * @param[in]  cdev  character device instance descriptor
+ * @return corresponding OBD device or NULL if none was found.
+ */
+static struct obd_device *chlg_obd_get(dev_t cdev)
+{
+       int minor = MINOR(cdev);
+       struct obd_device *obd = NULL;
+       struct chlg_registered_dev *curr;
+
+       mutex_lock(&chlg_registered_dev_lock);
+       list_for_each_entry(curr, &chlg_registered_devices, ced_link) {
+               if (curr->ced_misc.minor == minor) {
+                       /* take the first available OBD device attached */
+                       obd = list_first_entry(&curr->ced_obds,
+                                              struct obd_device,
+                                              u.cli.cl_chg_dev_linkage);
+                       break;
+               }
+       }
+       mutex_unlock(&chlg_registered_dev_lock);
+       return obd;
+}
+
+/**
+ * Open handler, initialize internal CRS state and spawn prefetch thread if
+ * needed.
+ * @param[in]  inode  Inode struct for the open character device.
+ * @param[in]  file   Corresponding file pointer.
+ * @return 0 on success, negated error code on failure.
+ */
+static int chlg_open(struct inode *inode, struct file *file)
+{
+       struct chlg_reader_state *crs;
+       struct obd_device *obd = chlg_obd_get(inode->i_rdev);
+       struct task_struct *task;
+       int rc;
+       ENTRY;
+
+       if (!obd)
+               RETURN(-ENODEV);
+
+       OBD_ALLOC_PTR(crs);
+       if (!crs)
+               RETURN(-ENOMEM);
+
+       crs->crs_obd = obd;
+       crs->crs_err = false;
+       crs->crs_eof = false;
+       crs->crs_closed = false;
+
+       mutex_init(&crs->crs_lock);
+       INIT_LIST_HEAD(&crs->crs_rec_queue);
+       init_waitqueue_head(&crs->crs_waitq_prod);
+       init_waitqueue_head(&crs->crs_waitq_cons);
+
+       if (file->f_mode & FMODE_READ) {
+               task = kthread_run(chlg_load, crs, "chlg_load_thread");
+               if (IS_ERR(task)) {
+                       rc = PTR_ERR(task);
+                       CERROR("%s: cannot start changelog thread: rc = %d\n",
+                              obd->obd_name, rc);
+                       GOTO(err_crs, rc);
+               }
+       }
+
+       file->private_data = crs;
+       RETURN(0);
+
+err_crs:
+       OBD_FREE_PTR(crs);
+       return rc;
+}
+
+/**
+ * Close handler, release resources.
+ *
+ * @param[in]  inode  Inode struct for the open character device.
+ * @param[in]  file   Corresponding file pointer.
+ * @return 0 on success, negated error code on failure.
+ */
+static int chlg_release(struct inode *inode, struct file *file)
+{
+       struct chlg_reader_state *crs = file->private_data;
+
+       if (file->f_mode & FMODE_READ) {
+               crs->crs_closed = true;
+               wake_up_all(&crs->crs_waitq_prod);
+       } else {
+               /* No producer thread, release resource ourselves */
+               crs_free(crs);
+       }
+       return 0;
+}
+
+/**
+ * Poll handler, indicates whether the device is readable (new records) and
+ * writable (always).
+ *
+ * @param[in]  file   Device file pointer.
+ * @param[in]  wait   (opaque)
+ * @return combination of the poll status flags.
+ */
+static unsigned int chlg_poll(struct file *file, poll_table *wait)
+{
+       struct chlg_reader_state *crs  = file->private_data;
+       unsigned int mask = 0;
+
+       mutex_lock(&crs->crs_lock);
+       poll_wait(file, &crs->crs_waitq_cons, wait);
+       if (crs->crs_rec_count > 0)
+               mask |= POLLIN | POLLRDNORM;
+       if (crs->crs_err)
+               mask |= POLLERR;
+       if (crs->crs_eof)
+               mask |= POLLHUP;
+       mutex_unlock(&crs->crs_lock);
+       return mask;
+}
+
+static const struct file_operations chlg_fops = {
+       .owner          = THIS_MODULE,
+       .llseek         = chlg_llseek,
+       .read           = chlg_read,
+       .write          = chlg_write,
+       .open           = chlg_open,
+       .release        = chlg_release,
+       .poll           = chlg_poll,
+};
+
+/**
+ * This uses obd_name of the form: "testfs-MDT0000-mdc-ffff88006501600"
+ * and returns a name of the form: "changelog-testfs-MDT0000".
+ */
+static void get_chlg_name(char *name, size_t name_len, struct obd_device *obd)
+{
+       int i;
+
+       snprintf(name, name_len, "changelog-%s", obd->obd_name);
+
+       /* Find the 2nd '-' from the end and truncate on it */
+       for (i = 0; i < 2; i++) {
+               char *p = strrchr(name, '-');
+
+               if (p == NULL)
+                       return;
+               *p = '\0';
+       }
+}
+
+/**
+ * Find a changelog character device by name.
+ * All devices registered during MDC setup are listed in a global list with
+ * their names attached.
+ */
+static struct chlg_registered_dev *
+chlg_registered_dev_find_by_name(const char *name)
+{
+       struct chlg_registered_dev *dit;
+
+       list_for_each_entry(dit, &chlg_registered_devices, ced_link)
+               if (strcmp(name, dit->ced_name) == 0)
+                       return dit;
+       return NULL;
+}
+
+/**
+ * Find chlg_registered_dev structure for a given OBD device.
+ * This is bad O(n^2) but for each filesystem:
+ *   - N is # of MDTs times # of mount points
+ *   - this only runs at shutdown
+ */
+static struct chlg_registered_dev *
+chlg_registered_dev_find_by_obd(const struct obd_device *obd)
+{
+       struct chlg_registered_dev *dit;
+       struct obd_device *oit;
+
+       list_for_each_entry(dit, &chlg_registered_devices, ced_link)
+               list_for_each_entry(oit, &dit->ced_obds,
+                                   u.cli.cl_chg_dev_linkage)
+                       if (oit == obd)
+                               return dit;
+       return NULL;
+}
+
+/**
+ * Changelog character device initialization.
+ * Register a misc character device with a dynamic minor number, under a name
+ * of the form: 'changelog-fsname-MDTxxxx'. Reference this OBD device with it.
+ *
+ * @param[in] obd  This MDC obd_device.
+ * @return 0 on success, negated error code on failure.
+ */
+int mdc_changelog_cdev_init(struct obd_device *obd)
+{
+       struct chlg_registered_dev *exist;
+       struct chlg_registered_dev *entry;
+       int rc;
+       ENTRY;
+
+       OBD_ALLOC_PTR(entry);
+       if (entry == NULL)
+               RETURN(-ENOMEM);
+
+       get_chlg_name(entry->ced_name, sizeof(entry->ced_name), obd);
+
+       entry->ced_misc.minor = MISC_DYNAMIC_MINOR;
+       entry->ced_misc.name  = entry->ced_name;
+       entry->ced_misc.fops  = &chlg_fops;
+
+       kref_init(&entry->ced_refs);
+       INIT_LIST_HEAD(&entry->ced_obds);
+       INIT_LIST_HEAD(&entry->ced_link);
+
+       mutex_lock(&chlg_registered_dev_lock);
+       exist = chlg_registered_dev_find_by_name(entry->ced_name);
+       if (exist != NULL) {
+               kref_get(&exist->ced_refs);
+               list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &exist->ced_obds);
+               GOTO(out_unlock, rc = 0);
+       }
+
+       /* Register new character device */
+       rc = misc_register(&entry->ced_misc);
+       if (rc != 0)
+               GOTO(out_unlock, rc);
+
+       list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &entry->ced_obds);
+       list_add_tail(&entry->ced_link, &chlg_registered_devices);
+
+       entry = NULL;   /* prevent it from being freed below */
+
+out_unlock:
+       mutex_unlock(&chlg_registered_dev_lock);
+       if (entry)
+               OBD_FREE_PTR(entry);
+       RETURN(rc);
+}
+
+/**
+ * Deregister a changelog character device whose refcount has reached zero.
+ */
+static void chlg_dev_clear(struct kref *kref)
+{
+       struct chlg_registered_dev *entry = container_of(kref,
+                                                     struct chlg_registered_dev,
+                                                     ced_refs);
+       ENTRY;
+
+       list_del(&entry->ced_link);
+       misc_deregister(&entry->ced_misc);
+       OBD_FREE_PTR(entry);
+       EXIT;
+}
+
+/**
+ * Release OBD, decrease reference count of the corresponding changelog device.
+ */
+void mdc_changelog_cdev_finish(struct obd_device *obd)
+{
+       struct chlg_registered_dev *dev = chlg_registered_dev_find_by_obd(obd);
+       ENTRY;
+
+       mutex_lock(&chlg_registered_dev_lock);
+       list_del_init(&obd->u.cli.cl_chg_dev_linkage);
+       kref_put(&dev->ced_refs, chlg_dev_clear);
+       mutex_unlock(&chlg_registered_dev_lock);
+       EXIT;
+}
index e723b55..47c55db 100644 (file)
@@ -139,6 +139,11 @@ enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
                              union ldlm_policy_data *policy,
                              enum ldlm_mode mode, struct lustre_handle *lockh);
 
+
+int mdc_changelog_cdev_init(struct obd_device *obd);
+
+void mdc_changelog_cdev_finish(struct obd_device *obd);
+
 static inline int mdc_prep_elc_req(struct obd_export *exp,
                                   struct ptlrpc_request *req, int opc,
                                   struct list_head *cancels, int count)
index 2c45545..acae8a8 100644 (file)
@@ -34,7 +34,6 @@
 
 #include <linux/init.h>
 #include <linux/kthread.h>
-#include <linux/miscdevice.h>
 #include <linux/module.h>
 #include <linux/pagemap.h>
 #include <linux/user_namespace.h>
@@ -1794,171 +1793,6 @@ out:
        return rc;
 }
 
-static struct kuc_hdr *changelog_kuc_hdr(char *buf, size_t len, __u32 flags)
-{
-       struct kuc_hdr *lh = (struct kuc_hdr *)buf;
-
-       LASSERT(len <= KUC_CHANGELOG_MSG_MAXSIZE);
-
-       lh->kuc_magic = KUC_MAGIC;
-       lh->kuc_transport = KUC_TRANSPORT_CHANGELOG;
-       lh->kuc_flags = flags;
-       lh->kuc_msgtype = CL_RECORD;
-       lh->kuc_msglen = len;
-       return lh;
-}
-
-struct changelog_show {
-       __u64                            cs_startrec;
-       enum changelog_send_flag         cs_flags;
-       struct file                     *cs_fp;
-       char                            *cs_buf;
-       struct obd_device               *cs_obd;
-};
-
-static inline char *cs_obd_name(struct changelog_show *cs)
-{
-       return cs->cs_obd->obd_name;
-}
-
-static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh,
-                            struct llog_rec_hdr *hdr, void *data)
-{
-       struct changelog_show           *cs = data;
-       struct llog_changelog_rec       *rec = (struct llog_changelog_rec *)hdr;
-       struct kuc_hdr                  *lh;
-       size_t                           len;
-       int                              rc;
-       ENTRY;
-
-       if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
-               rc = -EINVAL;
-               CERROR("%s: not a changelog rec %x/%d: rc = %d\n",
-                      cs_obd_name(cs), rec->cr_hdr.lrh_type,
-                      rec->cr.cr_type, rc);
-               RETURN(rc);
-       }
-
-       if (rec->cr.cr_index < cs->cs_startrec) {
-               /* Skip entries earlier than what we are interested in */
-               CDEBUG(D_HSM, "rec=%llu start=%llu\n",
-                      rec->cr.cr_index, cs->cs_startrec);
-               RETURN(0);
-       }
-
-       CDEBUG(D_HSM, "%llu %02d%-5s %llu 0x%x t="DFID" p="DFID" %.*s\n",
-              rec->cr.cr_index, rec->cr.cr_type,
-              changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
-              rec->cr.cr_flags & CLF_FLAGMASK,
-              PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
-              rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
-
-       len = sizeof(*lh) + changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
-
-        /* Set up the message */
-        lh = changelog_kuc_hdr(cs->cs_buf, len, cs->cs_flags);
-        memcpy(lh + 1, &rec->cr, len - sizeof(*lh));
-
-        rc = libcfs_kkuc_msg_put(cs->cs_fp, lh);
-       CDEBUG(D_HSM, "kucmsg fp %p len %zu rc %d\n", cs->cs_fp, len, rc);
-
-        RETURN(rc);
-}
-
-static int mdc_changelog_send_thread(void *csdata)
-{
-       struct changelog_show   *cs = csdata;
-       struct llog_ctxt        *ctxt = NULL;
-       struct llog_handle      *llh = NULL;
-       struct kuc_hdr          *kuch;
-       enum llog_flag           flags = LLOG_F_IS_CAT;
-       int                      rc;
-
-       CDEBUG(D_HSM, "changelog to fp=%p start %llu\n",
-              cs->cs_fp, cs->cs_startrec);
-
-       OBD_ALLOC(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE);
-       if (cs->cs_buf == NULL)
-               GOTO(out, rc = -ENOMEM);
-
-       /* Set up the remote catalog handle */
-       ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
-       if (ctxt == NULL)
-               GOTO(out, rc = -ENOENT);
-       rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG,
-                      LLOG_OPEN_EXISTS);
-       if (rc) {
-               CERROR("%s: fail to open changelog catalog: rc = %d\n",
-                      cs_obd_name(cs), rc);
-               GOTO(out, rc);
-       }
-
-       if (cs->cs_flags & CHANGELOG_FLAG_JOBID)
-               flags |= LLOG_F_EXT_JOBID;
-
-       rc = llog_init_handle(NULL, llh, flags, NULL);
-       if (rc) {
-               CERROR("llog_init_handle failed %d\n", rc);
-               GOTO(out, rc);
-       }
-
-       rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0);
-
-       /* Send EOF no matter what our result */
-       kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), cs->cs_flags);
-       kuch->kuc_msgtype = CL_EOF;
-       libcfs_kkuc_msg_put(cs->cs_fp, kuch);
-
-out:
-       fput(cs->cs_fp);
-       if (llh)
-               llog_cat_close(NULL, llh);
-       if (ctxt)
-               llog_ctxt_put(ctxt);
-       if (cs->cs_buf)
-               OBD_FREE(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE);
-       OBD_FREE_PTR(cs);
-       return rc;
-}
-
-static int mdc_ioc_changelog_send(struct obd_device *obd,
-                                  struct ioc_changelog *icc)
-{
-       struct changelog_show *cs;
-       struct task_struct *task;
-       int rc;
-
-        /* Freed in mdc_changelog_send_thread */
-        OBD_ALLOC_PTR(cs);
-        if (!cs)
-                return -ENOMEM;
-
-       cs->cs_obd = obd;
-       cs->cs_startrec = icc->icc_recno;
-       /* matching fput in mdc_changelog_send_thread */
-       cs->cs_fp = fget(icc->icc_id);
-       cs->cs_flags = icc->icc_flags;
-
-       /*
-        * New thread because we should return to user app before
-        * writing into our pipe
-        */
-       task = kthread_run(mdc_changelog_send_thread, cs,
-                          "mdc_clg_send_thread");
-       if (IS_ERR(task)) {
-               rc = PTR_ERR(task);
-               CERROR("%s: cannot start changelog thread: rc = %d\n",
-                      cs_obd_name(cs), rc);
-               OBD_FREE_PTR(cs);
-       } else {
-               rc = 0;
-               CDEBUG(D_HSM, "%s: started changelog thread\n",
-                      cs_obd_name(cs));
-       }
-
-       return rc;
-}
-
 static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
                                 struct lustre_kernelcomm *lk);
 
@@ -2058,30 +1892,18 @@ out:
 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         void *karg, void __user *uarg)
 {
-        struct obd_device *obd = exp->exp_obd;
-        struct obd_ioctl_data *data = karg;
-        struct obd_import *imp = obd->u.cli.cl_import;
-        int rc;
-        ENTRY;
+       struct obd_device *obd = exp->exp_obd;
+       struct obd_ioctl_data *data = karg;
+       struct obd_import *imp = obd->u.cli.cl_import;
+       int rc;
+       ENTRY;
 
        if (!try_module_get(THIS_MODULE)) {
                CERROR("%s: cannot get module '%s'\n", obd->obd_name,
                       module_name(THIS_MODULE));
                return -EINVAL;
        }
-        switch (cmd) {
-        case OBD_IOC_CHANGELOG_SEND:
-                rc = mdc_ioc_changelog_send(obd, karg);
-                GOTO(out, rc);
-        case OBD_IOC_CHANGELOG_CLEAR: {
-                struct ioc_changelog *icc = karg;
-                struct changelog_setinfo cs =
-                        {.cs_recno = icc->icc_recno, .cs_id = icc->icc_id};
-                rc = obd_set_info_async(NULL, exp, strlen(KEY_CHANGELOG_CLEAR),
-                                        KEY_CHANGELOG_CLEAR, sizeof(cs), &cs,
-                                        NULL);
-                GOTO(out, rc);
-        }
+       switch (cmd) {
        case OBD_IOC_FID2PATH:
                rc = mdc_ioc_fid2path(exp, karg);
                GOTO(out, rc);
@@ -2106,31 +1928,31 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
        case LL_IOC_HSM_REQUEST:
                rc = mdc_ioc_hsm_request(exp, karg);
                GOTO(out, rc);
-        case OBD_IOC_CLIENT_RECOVER:
-                rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0);
-                if (rc < 0)
-                        GOTO(out, rc);
-                GOTO(out, rc = 0);
-        case IOC_OSC_SET_ACTIVE:
-                rc = ptlrpc_set_import_active(imp, data->ioc_offset);
-                GOTO(out, rc);
-        case OBD_IOC_PING_TARGET:
-                rc = ptlrpc_obd_ping(obd);
-                GOTO(out, rc);
-        /*
-         * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by
-         * LMV instead of MDC. But when the cluster is upgraded from 1.8,
-         * there'd be no LMV layer thus we might be called here. Eventually
-         * this code should be removed.
-         * bz20731, LU-592.
-         */
-        case IOC_OBD_STATFS: {
-                struct obd_statfs stat_buf = {0};
+       case OBD_IOC_CLIENT_RECOVER:
+               rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0);
+               if (rc < 0)
+                       GOTO(out, rc);
+               GOTO(out, rc = 0);
+       case IOC_OSC_SET_ACTIVE:
+               rc = ptlrpc_set_import_active(imp, data->ioc_offset);
+               GOTO(out, rc);
+       case OBD_IOC_PING_TARGET:
+               rc = ptlrpc_obd_ping(obd);
+               GOTO(out, rc);
+       /*
+        * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by
+        * LMV instead of MDC. But when the cluster is upgraded from 1.8,
+        * there'd be no LMV layer thus we might be called here. Eventually
+        * this code should be removed.
+        * bz20731, LU-592.
+        */
+       case IOC_OBD_STATFS: {
+               struct obd_statfs stat_buf = {0};
 
-                if (*((__u32 *) data->ioc_inlbuf2) != 0)
-                        GOTO(out, rc = -ENODEV);
+               if (*((__u32 *) data->ioc_inlbuf2) != 0)
+                       GOTO(out, rc = -ENODEV);
 
-                /* copy UUID */
+               /* copy UUID */
                if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd),
                                 min((int)data->ioc_plen2,
                                     (int)sizeof(struct obd_uuid))))
@@ -2143,12 +1965,12 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                        GOTO(out, rc);
 
                if (copy_to_user(data->ioc_pbuf1, &stat_buf,
-                                     min((int) data->ioc_plen1,
-                                         (int) sizeof(stat_buf))))
-                        GOTO(out, rc = -EFAULT);
+                                    min((int) data->ioc_plen1,
+                                        (int) sizeof(stat_buf))))
+                       GOTO(out, rc = -EFAULT);
 
-                GOTO(out, rc = 0);
-        }
+               GOTO(out, rc = 0);
+       }
        case OBD_IOC_QUOTACTL: {
                struct if_quotactl *qctl = karg;
                struct obd_quotactl *oqctl;
@@ -2663,16 +2485,28 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 
        rc = mdc_llog_init(obd);
         if (rc) {
-                mdc_cleanup(obd);
-                CERROR("failed to setup llogging subsystems\n");
-               RETURN(rc);
+                CERROR("%s: failed to setup llogging subsystems: rc = %d\n",
+                      obd->obd_name, rc);
+               GOTO(err_mdc_cleanup, rc);
         }
 
-        RETURN(rc);
+       rc = mdc_changelog_cdev_init(obd);
+       if (rc) {
+               CERROR("%s: failed to setup changelog char device: rc = %d\n",
+                      obd->obd_name, rc);
+               GOTO(err_mdc_cleanup, rc);
+       }
+
+       EXIT;
+err_mdc_cleanup:
+       if (rc)
+               client_obd_cleanup(obd);
 
 err_ptlrpcd_decref:
-        ptlrpcd_decref();
-        RETURN(rc);
+       if (rc)
+               ptlrpcd_decref();
+
+        return rc;
 }
 
 /* Initialize the default and maximum LOV EA sizes.  This allows
@@ -2707,6 +2541,8 @@ static int mdc_precleanup(struct obd_device *obd)
        if (obd->obd_type->typ_refcnt <= 1)
                libcfs_kkuc_group_rem(0, KUC_GRP_HSM);
 
+       mdc_changelog_cdev_finish(obd);
+
        obd_cleanup_client_import(obd);
        ptlrpc_lprocfs_unregister_obd(obd);
        lprocfs_obd_cleanup(obd);
index 9e42a93..4e3d03c 100644 (file)
@@ -96,7 +96,7 @@ liblustreapitmp_a_SOURCES = liblustreapi.c liblustreapi_hsm.c \
                            liblustreapi_kernelconn.c liblustreapi_param.c \
                            $(top_builddir)/libcfs/libcfs/util/string.c \
                            $(top_builddir)/libcfs/libcfs/util/param.c \
-                           liblustreapi_ladvise.c
+                           liblustreapi_ladvise.c liblustreapi_chlg.c
 if UTILS
 # build static and shared lib lustreapi
 liblustreapi.a : liblustreapitmp.a
index cf27a9b..fd0256d 100644 (file)
@@ -3662,213 +3662,6 @@ int root_ioctl(const char *mdtname, int opc, void *data, int *mdtidxp,
         return rc;
 }
 
-/****** Changelog API ********/
-
-static int changelog_ioctl(const char *mdtname, int opc, int id,
-                           long long recno, int flags)
-{
-        struct ioc_changelog data;
-        int *idx;
-
-        data.icc_id = id;
-        data.icc_recno = recno;
-        data.icc_flags = flags;
-        idx = (int *)(&data.icc_mdtindex);
-
-        return root_ioctl(mdtname, opc, &data, idx, WANT_ERROR);
-}
-
-#define CHANGELOG_PRIV_MAGIC 0xCA8E1080
-struct changelog_private {
-       int                             magic;
-       enum changelog_send_flag        flags;
-       struct lustre_kernelcomm        kuc;
-};
-
-/** Start reading from a changelog
- * @param priv Opaque private control structure
- * @param flags Start flags (e.g. CHANGELOG_FLAG_BLOCK)
- * @param device Report changes recorded on this MDT
- * @param startrec Report changes beginning with this record number
- * (just call llapi_changelog_fini when done; don't need an endrec)
- */
-int llapi_changelog_start(void **priv, enum changelog_send_flag flags,
-                         const char *device, long long startrec)
-{
-       struct changelog_private        *cp;
-       static bool                      warned;
-       int                              rc;
-
-       /* Set up the receiver control struct */
-       cp = calloc(1, sizeof(*cp));
-       if (cp == NULL)
-               return -ENOMEM;
-
-       cp->magic = CHANGELOG_PRIV_MAGIC;
-       cp->flags = flags;
-
-       /* Set up the receiver */
-       rc = libcfs_ukuc_start(&cp->kuc, 0 /* no group registration */, 0);
-       if (rc < 0)
-               goto out_free;
-
-       *priv = cp;
-
-       /* CHANGELOG_FLAG_JOBID will eventually become mandatory. Display a
-        * warning if it's missing. */
-       if (!(flags & CHANGELOG_FLAG_JOBID) && !warned) {
-               llapi_err_noerrno(LLAPI_MSG_WARN, "warning: %s() called "
-                                 "w/o CHANGELOG_FLAG_JOBID", __func__);
-               warned = true;
-       }
-
-       /* Tell the kernel to start sending */
-       rc = changelog_ioctl(device, OBD_IOC_CHANGELOG_SEND, cp->kuc.lk_wfd,
-                            startrec, flags);
-       /* Only the kernel reference keeps the write side open */
-       close(cp->kuc.lk_wfd);
-       cp->kuc.lk_wfd = LK_NOFD;
-       if (rc < 0) {
-               /* frees and clears priv */
-               llapi_changelog_fini(priv);
-               return rc;
-       }
-
-       return 0;
-
-out_free:
-       free(cp);
-       return rc;
-}
-
-/** Finish reading from a changelog */
-int llapi_changelog_fini(void **priv)
-{
-        struct changelog_private *cp = (struct changelog_private *)*priv;
-
-        if (!cp || (cp->magic != CHANGELOG_PRIV_MAGIC))
-                return -EINVAL;
-
-        libcfs_ukuc_stop(&cp->kuc);
-        free(cp);
-        *priv = NULL;
-        return 0;
-}
-
-/**
- * Convert all records to a same format according to the caller's wishes.
- * Default is CLF_VERSION | CLF_RENAME.
- * Add CLF_JOBID if explicitely requested.
- *
- * \param rec  The record to remap. It is expected to be big enough to
- *             properly handle the final format.
- * \return 1 if anything changed. 0 otherwise.
- */
-/** Read the next changelog entry
- * @param priv Opaque private control structure
- * @param rech Changelog record handle; record will be allocated here
- * @return 0 valid message received; rec is set
- *         <0 error code
- *         1 EOF
- */
-#define DEFAULT_RECORD_FMT     (CLF_VERSION | CLF_RENAME)
-int llapi_changelog_recv(void *priv, struct changelog_rec **rech)
-{
-       struct changelog_private        *cp = (struct changelog_private *)priv;
-       struct kuc_hdr                  *kuch;
-       enum changelog_rec_flags         rec_fmt = DEFAULT_RECORD_FMT;
-       int                              rc = 0;
-
-       if (!cp || (cp->magic != CHANGELOG_PRIV_MAGIC))
-               return -EINVAL;
-       if (rech == NULL)
-               return -EINVAL;
-       kuch = malloc(KUC_CHANGELOG_MSG_MAXSIZE);
-       if (kuch == NULL)
-               return -ENOMEM;
-
-       if (cp->flags & CHANGELOG_FLAG_JOBID)
-               rec_fmt |= CLF_JOBID;
-
-repeat:
-       rc = libcfs_ukuc_msg_get(&cp->kuc, (char *)kuch,
-                                KUC_CHANGELOG_MSG_MAXSIZE,
-                                KUC_TRANSPORT_CHANGELOG);
-       if (rc < 0)
-               goto out_free;
-
-        if ((kuch->kuc_transport != KUC_TRANSPORT_CHANGELOG) ||
-            ((kuch->kuc_msgtype != CL_RECORD) &&
-             (kuch->kuc_msgtype != CL_EOF))) {
-                llapi_err_noerrno(LLAPI_MSG_ERROR,
-                                  "Unknown changelog message type %d:%d\n",
-                                  kuch->kuc_transport, kuch->kuc_msgtype);
-                rc = -EPROTO;
-                goto out_free;
-        }
-
-        if (kuch->kuc_msgtype == CL_EOF) {
-                if (cp->flags & CHANGELOG_FLAG_FOLLOW) {
-                        /* Ignore EOFs */
-                        goto repeat;
-                } else {
-                        rc = 1;
-                        goto out_free;
-                }
-        }
-
-       /* Our message is a changelog_rec.  Use pointer math to skip
-        * kuch_hdr and point directly to the message payload. */
-       *rech = (struct changelog_rec *)(kuch + 1);
-       changelog_remap_rec(*rech, rec_fmt);
-
-        return 0;
-
-out_free:
-        *rech = NULL;
-        free(kuch);
-        return rc;
-}
-
-/** Release the changelog record when done with it. */
-int llapi_changelog_free(struct changelog_rec **rech)
-{
-        if (*rech) {
-                /* We allocated memory starting at the kuc_hdr, but passed
-                 * the consumer a pointer to the payload.
-                 * Use pointer math to get back to the header.
-                 */
-                struct kuc_hdr *kuch = (struct kuc_hdr *)*rech - 1;
-                free(kuch);
-        }
-        *rech = NULL;
-        return 0;
-}
-
-int llapi_changelog_clear(const char *mdtname, const char *idstr,
-                          long long endrec)
-{
-       long id;
-
-        if (endrec < 0) {
-                llapi_err_noerrno(LLAPI_MSG_ERROR,
-                                  "can't purge negative records\n");
-                return -EINVAL;
-        }
-
-        id = strtol(idstr + strlen(CHANGELOG_USER_PREFIX), NULL, 10);
-        if ((id == 0) || (strncmp(idstr, CHANGELOG_USER_PREFIX,
-                                  strlen(CHANGELOG_USER_PREFIX)) != 0)) {
-                llapi_err_noerrno(LLAPI_MSG_ERROR,
-                                  "expecting id of the form '"
-                                  CHANGELOG_USER_PREFIX
-                                  "<num>'; got '%s'\n", idstr);
-                return -EINVAL;
-        }
-
-        return changelog_ioctl(mdtname, OBD_IOC_CHANGELOG_CLEAR, id, endrec, 0);
-}
-
 int llapi_fid2path(const char *device, const char *fidstr, char *buf,
                   int buflen, long long *recno, int *linkno)
 {
diff --git a/lustre/utils/liblustreapi_chlg.c b/lustre/utils/liblustreapi_chlg.c
new file mode 100644 (file)
index 0000000..f4661be
--- /dev/null
@@ -0,0 +1,311 @@
+/*
+ * LGPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * (C) Copyright 2017 Commissariat a l'energie atomique et aux energies
+ *     alternatives
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the GNU Lesser General Public License
+ * (LGPL) version 2.1 or (at your discretion) any later version.
+ * (LGPL) version 2.1 accompanies this distribution, and is available at
+ * http://www.gnu.org/licenses/lgpl-2.1.html
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * LGPL HEADER END
+ */
+/*
+ * lustre/utils/liblustreapi_chlg.c
+ *
+ * lustreapi library for filesystem changelog
+ *
+ * Author: Henri Doreau <henri.doreau@cea.fr>
+ */
+
+#include <fcntl.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <poll.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include <lustre/lustreapi.h>
+
+
+static int chlg_dev_path(char *path, size_t path_len, const char *device)
+{
+       int rc;
+
+       rc = snprintf(path, path_len, "/dev/changelog-%s", device);
+       if (rc < 0)
+               return -EIO;
+
+       if (rc >= path_len)
+               return -EOVERFLOW;
+
+       return 0;
+}
+
+#define CHANGELOG_PRIV_MAGIC 0xCA8E1080
+#define CHANGELOG_BUFFER_SZ  4096
+
+/**
+ * Record state for efficient changelog consumption.
+ * Read chunks of CHANGELOG_BUFFER_SZ bytes.
+ */
+struct changelog_private {
+       /* Ensure that the structure is valid and initialized */
+       int                              clp_magic;
+       /* File descriptor on the changelog character device */
+       int                              clp_fd;
+       /* Changelog delivery mode */
+       enum changelog_send_flag         clp_send_flags;
+       /* Available bytes in buffer */
+       size_t                           clp_buf_len;
+       /* Current position in buffer */
+       char                            *clp_buf_pos;
+       /* Read buffer with records read from system */
+       char                             clp_buf[0];
+};
+
+/**
+ * Start reading from a changelog
+ *
+ * @param priv      Opaque private control structure
+ * @param flags     Start flags (e.g. CHANGELOG_FLAG_JOBID)
+ * @param device    Report changes recorded on this MDT
+ * @param startrec  Report changes beginning with this record number
+ * (just call llapi_changelog_fini when done; don't need an endrec)
+ */
+int llapi_changelog_start(void **priv, enum changelog_send_flag flags,
+                         const char *device, long long startrec)
+{
+       struct changelog_private *cp;
+       static bool warned_jobid;
+       static bool warned_follow;
+       char cdev_path[PATH_MAX];
+       int rc;
+
+       rc = chlg_dev_path(cdev_path, sizeof(cdev_path), device);
+       if (rc != 0)
+               return rc;
+
+       /* Set up the receiver control struct */
+       cp = calloc(1, sizeof(*cp) + CHANGELOG_BUFFER_SZ);
+       if (cp == NULL)
+               return -ENOMEM;
+
+       cp->clp_magic = CHANGELOG_PRIV_MAGIC;
+       cp->clp_send_flags = flags;
+
+       cp->clp_buf_len = 0;
+       cp->clp_buf_pos = cp->clp_buf;
+
+       /* Set up the receiver */
+       cp->clp_fd = open(cdev_path, O_RDONLY);
+       if (cp->clp_fd < 0) {
+               rc = -errno;
+               goto out_free_cp;
+       }
+
+       if (startrec != 0) {
+               rc = lseek(cp->clp_fd, startrec, SEEK_SET);
+               if (rc < 0) {
+                       rc = -errno;
+                       goto out_close;
+               }
+       }
+
+       *priv = cp;
+
+       /* CHANGELOG_FLAG_JOBID will eventually become mandatory. Display a
+        * warning if it's missing. */
+       if (!(flags & CHANGELOG_FLAG_JOBID) && !warned_jobid) {
+               llapi_err_noerrno(LLAPI_MSG_WARN, "warning: %s() called "
+                                 "without CHANGELOG_FLAG_JOBID", __func__);
+               warned_jobid = true;
+       }
+
+       /* Behavior expected by CHANGELOG_FLAG_FOLLOW is not implemented, warn
+        * the user and ignore it. */
+       if (flags & CHANGELOG_FLAG_FOLLOW && !warned_follow) {
+               llapi_err_noerrno(LLAPI_MSG_WARN, "warning: %s() called with "
+                                 "CHANGELOG_FLAG_FOLLOW (ignored)", __func__);
+               warned_follow = true;
+       }
+
+       return 0;
+
+out_close:
+       close(cp->clp_fd);
+out_free_cp:
+       free(cp);
+       return rc;
+}
+
+/** Finish reading from a changelog */
+int llapi_changelog_fini(void **priv)
+{
+       struct changelog_private *cp = *priv;
+
+       if (!cp || (cp->clp_magic != CHANGELOG_PRIV_MAGIC))
+               return -EINVAL;
+
+       close(cp->clp_fd);
+       free(cp);
+       *priv = NULL;
+       return 0;
+}
+
+static ssize_t chlg_read_bulk(struct changelog_private *cp)
+{
+       ssize_t rd_bytes;
+
+       if (!cp || cp->clp_magic != CHANGELOG_PRIV_MAGIC)
+               return -EINVAL;
+
+       rd_bytes = read(cp->clp_fd, cp->clp_buf, CHANGELOG_BUFFER_SZ);
+       if (rd_bytes < 0)
+               return -errno;
+
+       cp->clp_buf_pos = cp->clp_buf;
+       cp->clp_buf_len = rd_bytes;
+
+       return rd_bytes;
+}
+
+/**
+ * Returns a file descriptor to poll on.
+ *
+ * \@param[in]  priv  Opaque changelog reader structure.
+ * @return valid file descriptor on success, negated errno code on failure.
+ */
+int llapi_changelog_get_fd(void *priv)
+{
+       struct changelog_private *cp = priv;
+
+       if (!cp || cp->clp_magic != CHANGELOG_PRIV_MAGIC)
+               return -EINVAL;
+
+       return cp->clp_fd;
+}
+
+/** Read the next changelog entry
+ * @param priv Opaque private control structure
+ * @param rech Changelog record handle; record will be allocated here
+ * @return 0 valid message received; rec is set
+ *      <0 error code
+ *      1 EOF
+ */
+#define DEFAULT_RECORD_FMT     (CLF_VERSION | CLF_RENAME)
+int llapi_changelog_recv(void *priv, struct changelog_rec **rech)
+{
+       struct changelog_private *cp = priv;
+       enum changelog_rec_flags rec_fmt = DEFAULT_RECORD_FMT;
+       struct changelog_rec *tmp;
+       int rc = 0;
+
+       if (!cp || (cp->clp_magic != CHANGELOG_PRIV_MAGIC))
+               return -EINVAL;
+
+       if (rech == NULL)
+               return -EINVAL;
+
+       *rech = malloc(CR_MAXSIZE);
+       if (*rech == NULL)
+               return -ENOMEM;
+
+       if (cp->clp_send_flags & CHANGELOG_FLAG_JOBID)
+               rec_fmt |= CLF_JOBID;
+
+       if (cp->clp_buf + cp->clp_buf_len <= cp->clp_buf_pos) {
+               ssize_t refresh;
+
+               refresh = chlg_read_bulk(cp);
+               if (refresh == 0) {
+                       /* EOF, CHANGELOG_FLAG_FOLLOW ignored for now LU-7659 */
+                       rc = 1;
+                       goto out_free;
+               } else if (refresh < 0) {
+                       rc = refresh;
+                       goto out_free;
+               }
+       }
+
+       /* TODO check changelog_rec_size */
+       tmp = (struct changelog_rec *)cp->clp_buf_pos;
+
+       memcpy(*rech, cp->clp_buf_pos,
+              changelog_rec_size(tmp) + tmp->cr_namelen);
+
+       cp->clp_buf_pos += changelog_rec_size(tmp) + tmp->cr_namelen;
+       changelog_remap_rec(*rech, rec_fmt);
+
+       return 0;
+
+out_free:
+       free(*rech);
+       *rech = NULL;
+       return rc;
+}
+
+/** Release the changelog record when done with it. */
+int llapi_changelog_free(struct changelog_rec **rech)
+{
+       free(*rech);
+       *rech = NULL;
+       return 0;
+}
+
+int llapi_changelog_clear(const char *mdtname, const char *idstr,
+                         long long endrec)
+{
+       char dev_path[PATH_MAX];
+       char cmd[64];
+       size_t cmd_len = sizeof(cmd);
+       int fd;
+       int rc;
+
+       if (endrec < 0) {
+               llapi_err_noerrno(LLAPI_MSG_ERROR,
+                                 "can't purge negative records\n");
+               return -EINVAL;
+       }
+
+       chlg_dev_path(dev_path, sizeof(dev_path), mdtname);
+
+       rc = snprintf(cmd, cmd_len, "clear:%s:%lld", idstr, endrec);
+       if (rc >= sizeof(cmd))
+               return -EINVAL;
+
+       cmd_len = rc + 1;
+
+       fd = open(dev_path, O_WRONLY);
+       if (fd < 0) {
+               rc = -errno;
+               llapi_error(LLAPI_MSG_ERROR, rc, "cannot open '%s'", dev_path);
+               return rc;
+       }
+
+       rc = write(fd, cmd, cmd_len);
+       if (rc < 0) {
+               rc = -errno;
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "cannot purge records for '%s'", idstr);
+               goto out_close;
+       }
+
+       rc = 0;
+
+out_close:
+       close(fd);
+       return rc;
+}