From 1d40214d96dd6e36bd39a35f8419f753bae8d305 Mon Sep 17 00:00:00 2001 From: Henri Doreau Date: Wed, 13 Jan 2016 15:53:00 +0100 Subject: [PATCH] LU-7659 mdc: expose changelog through char devices Register one character device per MDT in order to allow non-llapi to read them and to make delivery more efficient. - open() spawns a thread to prefetch records and enqueue them into a local buffer (unless the device is open in write-only mode). - lseek() can be used to jump to a specific record, in which case the offset is a record number (with SEEK_SET) or a number of records to skip (SEEK_CUR). Movement can only be done forward. - read() copies records to userland. No truncation happens, so short reads are likely. - write() is used to transmit control commands to the device. The only available one is changelog_clear, which is done by writing "clear:cl:" into the device. - close() terminates the prefetch thread if any, and releases resources. It is possible to poll() on the device to get notified when new records are available for read. Signed-off-by: Henri Doreau Change-Id: I14709fdbac76b5512e58099e4e536cf9c973868c Reviewed-on: https://review.whamcloud.com/18900 Reviewed-by: Andreas Dilger Tested-by: Jenkins Tested-by: Maloo Reviewed-by: John L. Hammond Reviewed-by: Oleg Drokin --- lustre/include/lustre/lustre_user.h | 7 - lustre/include/lustre/lustreapi.h | 1 + lustre/include/lustre_ioctl.h | 2 +- lustre/include/obd.h | 4 +- lustre/include/uapi_kernelcomm.h | 2 - lustre/ldlm/ldlm_lib.c | 2 + lustre/llite/dir.c | 8 - lustre/lmv/lmv_obd.c | 13 - lustre/mdc/Makefile.in | 7 +- lustre/mdc/mdc_changelog.c | 734 ++++++++++++++++++++++++++++++++++++ lustre/mdc/mdc_internal.h | 5 + lustre/mdc/mdc_request.c | 272 +++---------- lustre/utils/Makefile.am | 2 +- lustre/utils/liblustreapi.c | 207 ---------- lustre/utils/liblustreapi_chlg.c | 311 +++++++++++++++ 15 files changed, 1118 insertions(+), 459 deletions(-) create mode 100644 lustre/mdc/mdc_changelog.c create mode 100644 lustre/utils/liblustreapi_chlg.c diff --git a/lustre/include/lustre/lustre_user.h b/lustre/include/lustre/lustre_user.h index 6b33e95..52f59ad 100644 --- a/lustre/include/lustre/lustre_user.h +++ b/lustre/include/lustre/lustre_user.h @@ -992,13 +992,6 @@ static inline void changelog_remap_rec(struct changelog_rec *rec, rec->cr_flags = (rec->cr_flags & CLF_FLAGMASK) | crf_wanted; } -struct ioc_changelog { - __u64 icc_recno; - __u32 icc_mdtindex; - __u32 icc_id; - __u32 icc_flags; -}; - enum changelog_message_type { CL_RECORD = 10, /* message is a changelog_rec */ CL_EOF = 11, /* at end of current changelog */ diff --git a/lustre/include/lustre/lustreapi.h b/lustre/include/lustre/lustreapi.h index 6c0a17f..07484ca 100644 --- a/lustre/include/lustre/lustreapi.h +++ b/lustre/include/lustre/lustreapi.h @@ -347,6 +347,7 @@ extern int llapi_changelog_start(void **priv, enum changelog_send_flag flags, extern int llapi_changelog_fini(void **priv); extern int llapi_changelog_recv(void *priv, struct changelog_rec **rech); extern int llapi_changelog_free(struct changelog_rec **rech); +extern int llapi_changelog_get_fd(void *priv); /* Allow records up to endrec to be destroyed; requires registered id. */ extern int llapi_changelog_clear(const char *mdtname, const char *idstr, long long endrec); diff --git a/lustre/include/lustre_ioctl.h b/lustre/include/lustre_ioctl.h index fbd0f03..fa63359 100644 --- a/lustre/include/lustre_ioctl.h +++ b/lustre/include/lustre_ioctl.h @@ -373,7 +373,7 @@ obd_ioctl_unpack(struct obd_ioctl_data *data, char *pbuf, int max_len) #define OBD_GET_VERSION _IOWR('f', 144, OBD_IOC_DATA_TYPE) /* OBD_IOC_GSS_SUPPORT _IOWR('f', 145, OBD_IOC_DATA_TYPE) */ /* OBD_IOC_CLOSE_UUID _IOWR('f', 147, OBD_IOC_DATA_TYPE) */ -#define OBD_IOC_CHANGELOG_SEND _IOW ('f', 148, OBD_IOC_DATA_TYPE) +/* OBD_IOC_CHANGELOG_SEND _IOW ('f', 148, OBD_IOC_DATA_TYPE) */ #define OBD_IOC_GETDEVICE _IOWR('f', 149, OBD_IOC_DATA_TYPE) #define OBD_IOC_FID2PATH _IOWR('f', 150, OBD_IOC_DATA_TYPE) /* lustre/lustre_user.h 151-153 */ diff --git a/lustre/include/obd.h b/lustre/include/obd.h index b399149..455e232 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -325,13 +325,15 @@ struct client_obd { struct lu_client_seq *cl_seq; struct rw_semaphore cl_seq_rwsem; - atomic_t cl_resends; /* resend count */ + atomic_t cl_resends; /* resend count */ /* ptlrpc work for writeback in ptlrpcd context */ void *cl_writeback_work; void *cl_lru_work; /* hash tables for osc_quota_info */ struct cfs_hash *cl_quota_hash[LL_MAXQUOTAS]; + /* Links to the global list of registered changelog devices */ + struct list_head cl_chg_dev_linkage; }; #define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid) diff --git a/lustre/include/uapi_kernelcomm.h b/lustre/include/uapi_kernelcomm.h index 208002c..e8119f5 100644 --- a/lustre/include/uapi_kernelcomm.h +++ b/lustre/include/uapi_kernelcomm.h @@ -51,7 +51,6 @@ struct kuc_hdr { __u16 kuc_msglen; /* Including header */ } __attribute__((aligned(sizeof(__u64)))); -#define KUC_CHANGELOG_MSG_MAXSIZE (sizeof(struct kuc_hdr)+CR_MAXSIZE) #define KUC_MAGIC 0x191C /*Lustre9etLinC */ @@ -59,7 +58,6 @@ struct kuc_hdr { enum kuc_transport_type { KUC_TRANSPORT_GENERIC = 1, KUC_TRANSPORT_HSM = 2, - KUC_TRANSPORT_CHANGELOG = 3, }; enum kuc_generic_message_type { diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index ced1198..268ae920 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -436,6 +436,8 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) init_waitqueue_head(&cli->cl_mod_rpcs_waitq); cli->cl_mod_tag_bitmap = NULL; + INIT_LIST_HEAD(&cli->cl_chg_dev_linkage); + if (connect_op == MDS_CONNECT) { cli->cl_max_mod_rpcs_in_flight = cli->cl_max_rpcs_in_flight - 1; OBD_ALLOC(cli->cl_mod_tag_bitmap, diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index b171f3e..afa1020 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -1546,14 +1546,6 @@ out_rmdir: RETURN(obd_iocontrol(cmd, sbi->ll_md_exp, 0, NULL, (void __user *)arg)); } - case OBD_IOC_CHANGELOG_SEND: - case OBD_IOC_CHANGELOG_CLEAR: - if (!cfs_capable(CFS_CAP_SYS_ADMIN)) - RETURN(-EPERM); - - rc = copy_and_ioctl(cmd, sbi->ll_md_exp, (void __user *)arg, - sizeof(struct ioc_changelog)); - RETURN(rc); case OBD_IOC_FID2PATH: RETURN(ll_fid2path(inode, (void __user *)arg)); case LL_IOC_GETPARENT: diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 74166fb..5be99ee 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -985,19 +985,6 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, OBD_FREE_PTR(oqctl); break; } - case OBD_IOC_CHANGELOG_SEND: - case OBD_IOC_CHANGELOG_CLEAR: { - struct ioc_changelog *icc = karg; - - if (icc->icc_mdtindex >= count) - RETURN(-ENODEV); - - tgt = lmv->tgts[icc->icc_mdtindex]; - if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) - RETURN(-ENODEV); - rc = obd_iocontrol(cmd, tgt->ltd_exp, sizeof(*icc), icc, NULL); - break; - } case LL_IOC_GET_CONNECT_FLAGS: { tgt = lmv->tgts[0]; if (tgt == NULL || tgt->ltd_exp == NULL) diff --git a/lustre/mdc/Makefile.in b/lustre/mdc/Makefile.in index f007298..d29d31e 100644 --- a/lustre/mdc/Makefile.in +++ b/lustre/mdc/Makefile.in @@ -1,5 +1,10 @@ MODULES := mdc -mdc-objs := mdc_request.o mdc_reint.o lproc_mdc.o mdc_lib.o mdc_locks.o +mdc-objs := mdc_request.o \ + mdc_reint.o \ + lproc_mdc.o \ + mdc_lib.o \ + mdc_locks.o \ + mdc_changelog.o EXTRA_DIST = $(mdc-objs:.o=.c) mdc_internal.h diff --git a/lustre/mdc/mdc_changelog.c b/lustre/mdc/mdc_changelog.c new file mode 100644 index 0000000..3ae27f9 --- /dev/null +++ b/lustre/mdc/mdc_changelog.c @@ -0,0 +1,734 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright (c) 2017, Commissariat a l'Energie Atomique et aux Energies + * Alternatives. + * + * Author: Henri Doreau + */ + +#define DEBUG_SUBSYSTEM S_MDC + +#include +#include +#include +#include + +#include + +#include "mdc_internal.h" + + +/* + * -- Changelog delivery through character device -- + */ + +/** + * Mutex to protect chlg_registered_devices below + */ +static DEFINE_MUTEX(chlg_registered_dev_lock); + +/** + * Global linked list of all registered devices (one per MDT). + */ +static LIST_HEAD(chlg_registered_devices); + + +struct chlg_registered_dev { + /* Device name of the form "changelog-{MDTNAME}" */ + char ced_name[32]; + /* Misc device descriptor */ + struct miscdevice ced_misc; + /* OBDs referencing this device (multiple mount point) */ + struct list_head ced_obds; + /* Reference counter for proper deregistration */ + struct kref ced_refs; + /* Link within the global chlg_registered_devices */ + struct list_head ced_link; +}; + +struct chlg_reader_state { + /* Shortcut to the corresponding OBD device */ + struct obd_device *crs_obd; + /* An error occurred that prevents from reading further */ + bool crs_err; + /* EOF, no more records available */ + bool crs_eof; + /* Userland reader closed connection */ + bool crs_closed; + /* Desired start position */ + __u64 crs_start_offset; + /* Wait queue for the catalog processing thread */ + wait_queue_head_t crs_waitq_prod; + /* Wait queue for the record copy threads */ + wait_queue_head_t crs_waitq_cons; + /* Mutex protecting crs_rec_count and crs_rec_queue */ + struct mutex crs_lock; + /* Number of item in the list */ + __u64 crs_rec_count; + /* List of prefetched enqueued_record::enq_linkage_items */ + struct list_head crs_rec_queue; +}; + +struct chlg_rec_entry { + /* Link within the chlg_reader_state::crs_rec_queue list */ + struct list_head enq_linkage; + /* Data (enq_record) field length */ + __u64 enq_length; + /* Copy of a changelog record (see struct llog_changelog_rec) */ + struct changelog_rec enq_record[]; +}; + +enum { + /* Number of records to prefetch locally. */ + CDEV_CHLG_MAX_PREFETCH = 1024, +}; + +/** + * ChangeLog catalog processing callback invoked on each record. + * If the current record is eligible to userland delivery, push + * it into the crs_rec_queue where the consumer code will fetch it. + * + * @param[in] env (unused) + * @param[in] llh Client-side handle used to identify the llog + * @param[in] hdr Header of the current llog record + * @param[in,out] data chlg_reader_state passed from caller + * + * @return 0 or LLOG_PROC_* control code on success, negated error on failure. + */ +static int chlg_read_cat_process_cb(const struct lu_env *env, + struct llog_handle *llh, + struct llog_rec_hdr *hdr, void *data) +{ + struct llog_changelog_rec *rec; + struct chlg_reader_state *crs = data; + struct chlg_rec_entry *enq; + struct l_wait_info lwi = { 0 }; + size_t len; + int rc; + ENTRY; + + LASSERT(crs != NULL); + LASSERT(hdr != NULL); + + rec = container_of(hdr, struct llog_changelog_rec, cr_hdr); + + if (rec->cr_hdr.lrh_type != CHANGELOG_REC) { + rc = -EINVAL; + CERROR("%s: not a changelog rec %x/%d in llog "DFID" rc = %d\n", + crs->crs_obd->obd_name, rec->cr_hdr.lrh_type, + rec->cr.cr_type, + PFID(lu_object_fid(&llh->lgh_obj->do_lu)), rc); + RETURN(rc); + } + + /* Skip undesired records */ + if (rec->cr.cr_index < crs->crs_start_offset) + RETURN(0); + + CDEBUG(D_HSM, "%llu %02d%-5s %llu 0x%x t="DFID" p="DFID" %.*s\n", + rec->cr.cr_index, rec->cr.cr_type, + changelog_type2str(rec->cr.cr_type), rec->cr.cr_time, + rec->cr.cr_flags & CLF_FLAGMASK, + PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), + rec->cr.cr_namelen, changelog_rec_name(&rec->cr)); + + l_wait_event(crs->crs_waitq_prod, + (crs->crs_rec_count < CDEV_CHLG_MAX_PREFETCH || + crs->crs_closed), &lwi); + + if (crs->crs_closed) + RETURN(LLOG_PROC_BREAK); + + len = changelog_rec_size(&rec->cr) + rec->cr.cr_namelen; + OBD_ALLOC(enq, sizeof(*enq) + len); + if (enq == NULL) + RETURN(-ENOMEM); + + INIT_LIST_HEAD(&enq->enq_linkage); + enq->enq_length = len; + memcpy(enq->enq_record, &rec->cr, len); + + mutex_lock(&crs->crs_lock); + list_add_tail(&enq->enq_linkage, &crs->crs_rec_queue); + crs->crs_rec_count++; + mutex_unlock(&crs->crs_lock); + + wake_up_all(&crs->crs_waitq_cons); + + RETURN(0); +} + +/** + * Remove record from the list it is attached to and free it. + */ +static void enq_record_delete(struct chlg_rec_entry *rec) +{ + list_del(&rec->enq_linkage); + OBD_FREE(rec, sizeof(*rec) + rec->enq_length); +} + +/** + * Release resources associated to a changelog_reader_state instance. + * + * @param crs CRS instance to release. + */ +static void crs_free(struct chlg_reader_state *crs) +{ + struct chlg_rec_entry *rec; + struct chlg_rec_entry *tmp; + + list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) + enq_record_delete(rec); + + OBD_FREE_PTR(crs); +} + +/** + * Record prefetch thread entry point. Opens the changelog catalog and starts + * reading records. + * + * @param[in,out] args chlg_reader_state passed from caller. + * @return 0 on success, negated error code on failure. + */ +static int chlg_load(void *args) +{ + struct chlg_reader_state *crs = args; + struct obd_device *obd = crs->crs_obd; + struct llog_ctxt *ctx = NULL; + struct llog_handle *llh = NULL; + struct l_wait_info lwi = { 0 }; + int rc; + ENTRY; + + ctx = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT); + if (ctx == NULL) + GOTO(err_out, rc = -ENOENT); + + rc = llog_open(NULL, ctx, &llh, NULL, CHANGELOG_CATALOG, + LLOG_OPEN_EXISTS); + if (rc) { + CERROR("%s: fail to open changelog catalog: rc = %d\n", + obd->obd_name, rc); + GOTO(err_out, rc); + } + + rc = llog_init_handle(NULL, llh, LLOG_F_IS_CAT|LLOG_F_EXT_JOBID, NULL); + if (rc) { + CERROR("%s: fail to init llog handle: rc = %d\n", + obd->obd_name, rc); + GOTO(err_out, rc); + } + + rc = llog_cat_process(NULL, llh, chlg_read_cat_process_cb, crs, 0, 0); + if (rc < 0) { + CERROR("%s: fail to process llog: rc = %d\n", obd->obd_name, rc); + GOTO(err_out, rc); + } + +err_out: + crs->crs_err = true; + wake_up_all(&crs->crs_waitq_cons); + + if (llh != NULL) + llog_cat_close(NULL, llh); + + if (ctx != NULL) + llog_ctxt_put(ctx); + + l_wait_event(crs->crs_waitq_prod, crs->crs_closed, &lwi); + crs_free(crs); + RETURN(rc); +} + +/** + * Read handler, dequeues records from the chlg_reader_state if any. + * No partial records are copied to userland so this function can return less + * data than required (short read). + * + * @param[in] file File pointer to the character device. + * @param[out] buff Userland buffer where to copy the records. + * @param[in] count Userland buffer size. + * @param[out] ppos File position, updated with the index number of the next + * record to read. + * @return number of copied bytes on success, negated error code on failure. + */ +static ssize_t chlg_read(struct file *file, char __user *buff, size_t count, + loff_t *ppos) +{ + struct chlg_reader_state *crs = file->private_data; + struct chlg_rec_entry *rec; + struct chlg_rec_entry *tmp; + struct l_wait_info lwi = { 0 }; + ssize_t written_total = 0; + LIST_HEAD(consumed); + ENTRY; + + if (file->f_flags & O_NONBLOCK && crs->crs_rec_count == 0) + RETURN(-EAGAIN); + + l_wait_event(crs->crs_waitq_cons, + crs->crs_rec_count > 0 || crs->crs_eof || crs->crs_err, + &lwi); + + mutex_lock(&crs->crs_lock); + list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) { + if (written_total + rec->enq_length > count) + break; + + if (copy_to_user(buff, rec->enq_record, rec->enq_length)) { + if (written_total == 0) + written_total = -EFAULT; + break; + } + + buff += rec->enq_length; + written_total += rec->enq_length; + + crs->crs_rec_count--; + list_move_tail(&rec->enq_linkage, &consumed); + + crs->crs_start_offset = rec->enq_record->cr_index + 1; + } + mutex_unlock(&crs->crs_lock); + + if (written_total > 0) + wake_up_all(&crs->crs_waitq_prod); + + list_for_each_entry_safe(rec, tmp, &consumed, enq_linkage) + enq_record_delete(rec); + + *ppos = crs->crs_start_offset; + + RETURN(written_total); +} + +/** + * Jump to a given record index. Helper for chlg_llseek(). + * + * @param[in,out] crs Internal reader state. + * @param[in] offset Desired offset (index record). + * @return 0 on success, negated error code on failure. + */ +static int chlg_set_start_offset(struct chlg_reader_state *crs, __u64 offset) +{ + struct chlg_rec_entry *rec; + struct chlg_rec_entry *tmp; + + mutex_lock(&crs->crs_lock); + if (offset < crs->crs_start_offset) { + mutex_unlock(&crs->crs_lock); + return -ERANGE; + } + + crs->crs_start_offset = offset; + list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage) { + struct changelog_rec *cr = rec->enq_record; + + if (cr->cr_index >= crs->crs_start_offset) + break; + + crs->crs_rec_count--; + enq_record_delete(rec); + } + + mutex_unlock(&crs->crs_lock); + wake_up_all(&crs->crs_waitq_prod); + return 0; +} + +/** + * Move read pointer to a certain record index, encoded as an offset. + * + * @param[in,out] file File pointer to the changelog character device + * @param[in] off Offset to skip, actually a record index, not byte count + * @param[in] whence Relative/Absolute interpretation of the offset + * @return the resulting position on success or negated error code on failure. + */ +static loff_t chlg_llseek(struct file *file, loff_t off, int whence) +{ + struct chlg_reader_state *crs = file->private_data; + loff_t pos; + int rc; + + switch (whence) { + case SEEK_SET: + pos = off; + break; + case SEEK_CUR: + pos = file->f_pos + off; + break; + case SEEK_END: + default: + return -EINVAL; + } + + /* We cannot go backward */ + if (pos < file->f_pos) + return -EINVAL; + + rc = chlg_set_start_offset(crs, pos); + if (rc != 0) + return rc; + + file->f_pos = pos; + return pos; +} + +/** + * Clear record range for a given changelog reader. + * + * @param[in] crs Current internal state. + * @param[in] reader Changelog reader ID (cl1, cl2...) + * @param[in] record Record index up which to clear + * @return 0 on success, negated error code on failure. + */ +static int chlg_clear(struct chlg_reader_state *crs, __u32 reader, __u64 record) +{ + struct obd_device *obd = crs->crs_obd; + struct changelog_setinfo cs = { + .cs_recno = record, + .cs_id = reader + }; + + return obd_set_info_async(NULL, obd->obd_self_export, + strlen(KEY_CHANGELOG_CLEAR), + KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, NULL); +} + +/** Maximum changelog control command size */ +#define CHLG_CONTROL_CMD_MAX 64 + +/** + * Handle writes() into the changelog character device. Write() can be used + * to request special control operations. + * + * @param[in] file File pointer to the changelog character device + * @param[in] buff User supplied data (written data) + * @param[in] count Number of written bytes + * @param[in] off (unused) + * @return number of written bytes on success, negated error code on failure. + */ +static ssize_t chlg_write(struct file *file, const char __user *buff, + size_t count, loff_t *off) +{ + struct chlg_reader_state *crs = file->private_data; + char *kbuf; + __u64 record; + __u32 reader; + int rc = 0; + ENTRY; + + if (count > CHLG_CONTROL_CMD_MAX) + RETURN(-EINVAL); + + OBD_ALLOC(kbuf, CHLG_CONTROL_CMD_MAX); + if (kbuf == NULL) + RETURN(-ENOMEM); + + if (copy_from_user(kbuf, buff, count)) + GOTO(out_kbuf, rc = -EFAULT); + + kbuf[CHLG_CONTROL_CMD_MAX - 1] = '\0'; + + if (sscanf(kbuf, "clear:cl%u:%llu", &reader, &record) == 2) + rc = chlg_clear(crs, reader, record); + else + rc = -EINVAL; + + EXIT; +out_kbuf: + OBD_FREE(kbuf, CHLG_CONTROL_CMD_MAX); + return rc < 0 ? rc : count; +} + +/** + * Find the OBD device associated to a changelog character device. + * @param[in] cdev character device instance descriptor + * @return corresponding OBD device or NULL if none was found. + */ +static struct obd_device *chlg_obd_get(dev_t cdev) +{ + int minor = MINOR(cdev); + struct obd_device *obd = NULL; + struct chlg_registered_dev *curr; + + mutex_lock(&chlg_registered_dev_lock); + list_for_each_entry(curr, &chlg_registered_devices, ced_link) { + if (curr->ced_misc.minor == minor) { + /* take the first available OBD device attached */ + obd = list_first_entry(&curr->ced_obds, + struct obd_device, + u.cli.cl_chg_dev_linkage); + break; + } + } + mutex_unlock(&chlg_registered_dev_lock); + return obd; +} + +/** + * Open handler, initialize internal CRS state and spawn prefetch thread if + * needed. + * @param[in] inode Inode struct for the open character device. + * @param[in] file Corresponding file pointer. + * @return 0 on success, negated error code on failure. + */ +static int chlg_open(struct inode *inode, struct file *file) +{ + struct chlg_reader_state *crs; + struct obd_device *obd = chlg_obd_get(inode->i_rdev); + struct task_struct *task; + int rc; + ENTRY; + + if (!obd) + RETURN(-ENODEV); + + OBD_ALLOC_PTR(crs); + if (!crs) + RETURN(-ENOMEM); + + crs->crs_obd = obd; + crs->crs_err = false; + crs->crs_eof = false; + crs->crs_closed = false; + + mutex_init(&crs->crs_lock); + INIT_LIST_HEAD(&crs->crs_rec_queue); + init_waitqueue_head(&crs->crs_waitq_prod); + init_waitqueue_head(&crs->crs_waitq_cons); + + if (file->f_mode & FMODE_READ) { + task = kthread_run(chlg_load, crs, "chlg_load_thread"); + if (IS_ERR(task)) { + rc = PTR_ERR(task); + CERROR("%s: cannot start changelog thread: rc = %d\n", + obd->obd_name, rc); + GOTO(err_crs, rc); + } + } + + file->private_data = crs; + RETURN(0); + +err_crs: + OBD_FREE_PTR(crs); + return rc; +} + +/** + * Close handler, release resources. + * + * @param[in] inode Inode struct for the open character device. + * @param[in] file Corresponding file pointer. + * @return 0 on success, negated error code on failure. + */ +static int chlg_release(struct inode *inode, struct file *file) +{ + struct chlg_reader_state *crs = file->private_data; + + if (file->f_mode & FMODE_READ) { + crs->crs_closed = true; + wake_up_all(&crs->crs_waitq_prod); + } else { + /* No producer thread, release resource ourselves */ + crs_free(crs); + } + return 0; +} + +/** + * Poll handler, indicates whether the device is readable (new records) and + * writable (always). + * + * @param[in] file Device file pointer. + * @param[in] wait (opaque) + * @return combination of the poll status flags. + */ +static unsigned int chlg_poll(struct file *file, poll_table *wait) +{ + struct chlg_reader_state *crs = file->private_data; + unsigned int mask = 0; + + mutex_lock(&crs->crs_lock); + poll_wait(file, &crs->crs_waitq_cons, wait); + if (crs->crs_rec_count > 0) + mask |= POLLIN | POLLRDNORM; + if (crs->crs_err) + mask |= POLLERR; + if (crs->crs_eof) + mask |= POLLHUP; + mutex_unlock(&crs->crs_lock); + return mask; +} + +static const struct file_operations chlg_fops = { + .owner = THIS_MODULE, + .llseek = chlg_llseek, + .read = chlg_read, + .write = chlg_write, + .open = chlg_open, + .release = chlg_release, + .poll = chlg_poll, +}; + +/** + * This uses obd_name of the form: "testfs-MDT0000-mdc-ffff88006501600" + * and returns a name of the form: "changelog-testfs-MDT0000". + */ +static void get_chlg_name(char *name, size_t name_len, struct obd_device *obd) +{ + int i; + + snprintf(name, name_len, "changelog-%s", obd->obd_name); + + /* Find the 2nd '-' from the end and truncate on it */ + for (i = 0; i < 2; i++) { + char *p = strrchr(name, '-'); + + if (p == NULL) + return; + *p = '\0'; + } +} + +/** + * Find a changelog character device by name. + * All devices registered during MDC setup are listed in a global list with + * their names attached. + */ +static struct chlg_registered_dev * +chlg_registered_dev_find_by_name(const char *name) +{ + struct chlg_registered_dev *dit; + + list_for_each_entry(dit, &chlg_registered_devices, ced_link) + if (strcmp(name, dit->ced_name) == 0) + return dit; + return NULL; +} + +/** + * Find chlg_registered_dev structure for a given OBD device. + * This is bad O(n^2) but for each filesystem: + * - N is # of MDTs times # of mount points + * - this only runs at shutdown + */ +static struct chlg_registered_dev * +chlg_registered_dev_find_by_obd(const struct obd_device *obd) +{ + struct chlg_registered_dev *dit; + struct obd_device *oit; + + list_for_each_entry(dit, &chlg_registered_devices, ced_link) + list_for_each_entry(oit, &dit->ced_obds, + u.cli.cl_chg_dev_linkage) + if (oit == obd) + return dit; + return NULL; +} + +/** + * Changelog character device initialization. + * Register a misc character device with a dynamic minor number, under a name + * of the form: 'changelog-fsname-MDTxxxx'. Reference this OBD device with it. + * + * @param[in] obd This MDC obd_device. + * @return 0 on success, negated error code on failure. + */ +int mdc_changelog_cdev_init(struct obd_device *obd) +{ + struct chlg_registered_dev *exist; + struct chlg_registered_dev *entry; + int rc; + ENTRY; + + OBD_ALLOC_PTR(entry); + if (entry == NULL) + RETURN(-ENOMEM); + + get_chlg_name(entry->ced_name, sizeof(entry->ced_name), obd); + + entry->ced_misc.minor = MISC_DYNAMIC_MINOR; + entry->ced_misc.name = entry->ced_name; + entry->ced_misc.fops = &chlg_fops; + + kref_init(&entry->ced_refs); + INIT_LIST_HEAD(&entry->ced_obds); + INIT_LIST_HEAD(&entry->ced_link); + + mutex_lock(&chlg_registered_dev_lock); + exist = chlg_registered_dev_find_by_name(entry->ced_name); + if (exist != NULL) { + kref_get(&exist->ced_refs); + list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &exist->ced_obds); + GOTO(out_unlock, rc = 0); + } + + /* Register new character device */ + rc = misc_register(&entry->ced_misc); + if (rc != 0) + GOTO(out_unlock, rc); + + list_add_tail(&obd->u.cli.cl_chg_dev_linkage, &entry->ced_obds); + list_add_tail(&entry->ced_link, &chlg_registered_devices); + + entry = NULL; /* prevent it from being freed below */ + +out_unlock: + mutex_unlock(&chlg_registered_dev_lock); + if (entry) + OBD_FREE_PTR(entry); + RETURN(rc); +} + +/** + * Deregister a changelog character device whose refcount has reached zero. + */ +static void chlg_dev_clear(struct kref *kref) +{ + struct chlg_registered_dev *entry = container_of(kref, + struct chlg_registered_dev, + ced_refs); + ENTRY; + + list_del(&entry->ced_link); + misc_deregister(&entry->ced_misc); + OBD_FREE_PTR(entry); + EXIT; +} + +/** + * Release OBD, decrease reference count of the corresponding changelog device. + */ +void mdc_changelog_cdev_finish(struct obd_device *obd) +{ + struct chlg_registered_dev *dev = chlg_registered_dev_find_by_obd(obd); + ENTRY; + + mutex_lock(&chlg_registered_dev_lock); + list_del_init(&obd->u.cli.cl_chg_dev_linkage); + kref_put(&dev->ced_refs, chlg_dev_clear); + mutex_unlock(&chlg_registered_dev_lock); + EXIT; +} diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index e723b55..47c55db 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -139,6 +139,11 @@ enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags, union ldlm_policy_data *policy, enum ldlm_mode mode, struct lustre_handle *lockh); + +int mdc_changelog_cdev_init(struct obd_device *obd); + +void mdc_changelog_cdev_finish(struct obd_device *obd); + static inline int mdc_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, int opc, struct list_head *cancels, int count) diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 2c45545..acae8a8 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -34,7 +34,6 @@ #include #include -#include #include #include #include @@ -1794,171 +1793,6 @@ out: return rc; } -static struct kuc_hdr *changelog_kuc_hdr(char *buf, size_t len, __u32 flags) -{ - struct kuc_hdr *lh = (struct kuc_hdr *)buf; - - LASSERT(len <= KUC_CHANGELOG_MSG_MAXSIZE); - - lh->kuc_magic = KUC_MAGIC; - lh->kuc_transport = KUC_TRANSPORT_CHANGELOG; - lh->kuc_flags = flags; - lh->kuc_msgtype = CL_RECORD; - lh->kuc_msglen = len; - return lh; -} - -struct changelog_show { - __u64 cs_startrec; - enum changelog_send_flag cs_flags; - struct file *cs_fp; - char *cs_buf; - struct obd_device *cs_obd; -}; - -static inline char *cs_obd_name(struct changelog_show *cs) -{ - return cs->cs_obd->obd_name; -} - -static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh, - struct llog_rec_hdr *hdr, void *data) -{ - struct changelog_show *cs = data; - struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; - struct kuc_hdr *lh; - size_t len; - int rc; - ENTRY; - - if (rec->cr_hdr.lrh_type != CHANGELOG_REC) { - rc = -EINVAL; - CERROR("%s: not a changelog rec %x/%d: rc = %d\n", - cs_obd_name(cs), rec->cr_hdr.lrh_type, - rec->cr.cr_type, rc); - RETURN(rc); - } - - if (rec->cr.cr_index < cs->cs_startrec) { - /* Skip entries earlier than what we are interested in */ - CDEBUG(D_HSM, "rec=%llu start=%llu\n", - rec->cr.cr_index, cs->cs_startrec); - RETURN(0); - } - - CDEBUG(D_HSM, "%llu %02d%-5s %llu 0x%x t="DFID" p="DFID" %.*s\n", - rec->cr.cr_index, rec->cr.cr_type, - changelog_type2str(rec->cr.cr_type), rec->cr.cr_time, - rec->cr.cr_flags & CLF_FLAGMASK, - PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), - rec->cr.cr_namelen, changelog_rec_name(&rec->cr)); - - len = sizeof(*lh) + changelog_rec_size(&rec->cr) + rec->cr.cr_namelen; - - /* Set up the message */ - lh = changelog_kuc_hdr(cs->cs_buf, len, cs->cs_flags); - memcpy(lh + 1, &rec->cr, len - sizeof(*lh)); - - rc = libcfs_kkuc_msg_put(cs->cs_fp, lh); - CDEBUG(D_HSM, "kucmsg fp %p len %zu rc %d\n", cs->cs_fp, len, rc); - - RETURN(rc); -} - -static int mdc_changelog_send_thread(void *csdata) -{ - struct changelog_show *cs = csdata; - struct llog_ctxt *ctxt = NULL; - struct llog_handle *llh = NULL; - struct kuc_hdr *kuch; - enum llog_flag flags = LLOG_F_IS_CAT; - int rc; - - CDEBUG(D_HSM, "changelog to fp=%p start %llu\n", - cs->cs_fp, cs->cs_startrec); - - OBD_ALLOC(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE); - if (cs->cs_buf == NULL) - GOTO(out, rc = -ENOMEM); - - /* Set up the remote catalog handle */ - ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT); - if (ctxt == NULL) - GOTO(out, rc = -ENOENT); - rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG, - LLOG_OPEN_EXISTS); - if (rc) { - CERROR("%s: fail to open changelog catalog: rc = %d\n", - cs_obd_name(cs), rc); - GOTO(out, rc); - } - - if (cs->cs_flags & CHANGELOG_FLAG_JOBID) - flags |= LLOG_F_EXT_JOBID; - - rc = llog_init_handle(NULL, llh, flags, NULL); - if (rc) { - CERROR("llog_init_handle failed %d\n", rc); - GOTO(out, rc); - } - - rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0); - - /* Send EOF no matter what our result */ - kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), cs->cs_flags); - kuch->kuc_msgtype = CL_EOF; - libcfs_kkuc_msg_put(cs->cs_fp, kuch); - -out: - fput(cs->cs_fp); - if (llh) - llog_cat_close(NULL, llh); - if (ctxt) - llog_ctxt_put(ctxt); - if (cs->cs_buf) - OBD_FREE(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE); - OBD_FREE_PTR(cs); - return rc; -} - -static int mdc_ioc_changelog_send(struct obd_device *obd, - struct ioc_changelog *icc) -{ - struct changelog_show *cs; - struct task_struct *task; - int rc; - - /* Freed in mdc_changelog_send_thread */ - OBD_ALLOC_PTR(cs); - if (!cs) - return -ENOMEM; - - cs->cs_obd = obd; - cs->cs_startrec = icc->icc_recno; - /* matching fput in mdc_changelog_send_thread */ - cs->cs_fp = fget(icc->icc_id); - cs->cs_flags = icc->icc_flags; - - /* - * New thread because we should return to user app before - * writing into our pipe - */ - task = kthread_run(mdc_changelog_send_thread, cs, - "mdc_clg_send_thread"); - if (IS_ERR(task)) { - rc = PTR_ERR(task); - CERROR("%s: cannot start changelog thread: rc = %d\n", - cs_obd_name(cs), rc); - OBD_FREE_PTR(cs); - } else { - rc = 0; - CDEBUG(D_HSM, "%s: started changelog thread\n", - cs_obd_name(cs)); - } - - return rc; -} - static int mdc_ioc_hsm_ct_start(struct obd_export *exp, struct lustre_kernelcomm *lk); @@ -2058,30 +1892,18 @@ out: static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg) { - struct obd_device *obd = exp->exp_obd; - struct obd_ioctl_data *data = karg; - struct obd_import *imp = obd->u.cli.cl_import; - int rc; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct obd_ioctl_data *data = karg; + struct obd_import *imp = obd->u.cli.cl_import; + int rc; + ENTRY; if (!try_module_get(THIS_MODULE)) { CERROR("%s: cannot get module '%s'\n", obd->obd_name, module_name(THIS_MODULE)); return -EINVAL; } - switch (cmd) { - case OBD_IOC_CHANGELOG_SEND: - rc = mdc_ioc_changelog_send(obd, karg); - GOTO(out, rc); - case OBD_IOC_CHANGELOG_CLEAR: { - struct ioc_changelog *icc = karg; - struct changelog_setinfo cs = - {.cs_recno = icc->icc_recno, .cs_id = icc->icc_id}; - rc = obd_set_info_async(NULL, exp, strlen(KEY_CHANGELOG_CLEAR), - KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, - NULL); - GOTO(out, rc); - } + switch (cmd) { case OBD_IOC_FID2PATH: rc = mdc_ioc_fid2path(exp, karg); GOTO(out, rc); @@ -2106,31 +1928,31 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, case LL_IOC_HSM_REQUEST: rc = mdc_ioc_hsm_request(exp, karg); GOTO(out, rc); - case OBD_IOC_CLIENT_RECOVER: - rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0); - if (rc < 0) - GOTO(out, rc); - GOTO(out, rc = 0); - case IOC_OSC_SET_ACTIVE: - rc = ptlrpc_set_import_active(imp, data->ioc_offset); - GOTO(out, rc); - case OBD_IOC_PING_TARGET: - rc = ptlrpc_obd_ping(obd); - GOTO(out, rc); - /* - * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by - * LMV instead of MDC. But when the cluster is upgraded from 1.8, - * there'd be no LMV layer thus we might be called here. Eventually - * this code should be removed. - * bz20731, LU-592. - */ - case IOC_OBD_STATFS: { - struct obd_statfs stat_buf = {0}; + case OBD_IOC_CLIENT_RECOVER: + rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0); + if (rc < 0) + GOTO(out, rc); + GOTO(out, rc = 0); + case IOC_OSC_SET_ACTIVE: + rc = ptlrpc_set_import_active(imp, data->ioc_offset); + GOTO(out, rc); + case OBD_IOC_PING_TARGET: + rc = ptlrpc_obd_ping(obd); + GOTO(out, rc); + /* + * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by + * LMV instead of MDC. But when the cluster is upgraded from 1.8, + * there'd be no LMV layer thus we might be called here. Eventually + * this code should be removed. + * bz20731, LU-592. + */ + case IOC_OBD_STATFS: { + struct obd_statfs stat_buf = {0}; - if (*((__u32 *) data->ioc_inlbuf2) != 0) - GOTO(out, rc = -ENODEV); + if (*((__u32 *) data->ioc_inlbuf2) != 0) + GOTO(out, rc = -ENODEV); - /* copy UUID */ + /* copy UUID */ if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd), min((int)data->ioc_plen2, (int)sizeof(struct obd_uuid)))) @@ -2143,12 +1965,12 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, GOTO(out, rc); if (copy_to_user(data->ioc_pbuf1, &stat_buf, - min((int) data->ioc_plen1, - (int) sizeof(stat_buf)))) - GOTO(out, rc = -EFAULT); + min((int) data->ioc_plen1, + (int) sizeof(stat_buf)))) + GOTO(out, rc = -EFAULT); - GOTO(out, rc = 0); - } + GOTO(out, rc = 0); + } case OBD_IOC_QUOTACTL: { struct if_quotactl *qctl = karg; struct obd_quotactl *oqctl; @@ -2663,16 +2485,28 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) rc = mdc_llog_init(obd); if (rc) { - mdc_cleanup(obd); - CERROR("failed to setup llogging subsystems\n"); - RETURN(rc); + CERROR("%s: failed to setup llogging subsystems: rc = %d\n", + obd->obd_name, rc); + GOTO(err_mdc_cleanup, rc); } - RETURN(rc); + rc = mdc_changelog_cdev_init(obd); + if (rc) { + CERROR("%s: failed to setup changelog char device: rc = %d\n", + obd->obd_name, rc); + GOTO(err_mdc_cleanup, rc); + } + + EXIT; +err_mdc_cleanup: + if (rc) + client_obd_cleanup(obd); err_ptlrpcd_decref: - ptlrpcd_decref(); - RETURN(rc); + if (rc) + ptlrpcd_decref(); + + return rc; } /* Initialize the default and maximum LOV EA sizes. This allows @@ -2707,6 +2541,8 @@ static int mdc_precleanup(struct obd_device *obd) if (obd->obd_type->typ_refcnt <= 1) libcfs_kkuc_group_rem(0, KUC_GRP_HSM); + mdc_changelog_cdev_finish(obd); + obd_cleanup_client_import(obd); ptlrpc_lprocfs_unregister_obd(obd); lprocfs_obd_cleanup(obd); diff --git a/lustre/utils/Makefile.am b/lustre/utils/Makefile.am index 9e42a93..4e3d03c 100644 --- a/lustre/utils/Makefile.am +++ b/lustre/utils/Makefile.am @@ -96,7 +96,7 @@ liblustreapitmp_a_SOURCES = liblustreapi.c liblustreapi_hsm.c \ liblustreapi_kernelconn.c liblustreapi_param.c \ $(top_builddir)/libcfs/libcfs/util/string.c \ $(top_builddir)/libcfs/libcfs/util/param.c \ - liblustreapi_ladvise.c + liblustreapi_ladvise.c liblustreapi_chlg.c if UTILS # build static and shared lib lustreapi liblustreapi.a : liblustreapitmp.a diff --git a/lustre/utils/liblustreapi.c b/lustre/utils/liblustreapi.c index cf27a9b..fd0256d 100644 --- a/lustre/utils/liblustreapi.c +++ b/lustre/utils/liblustreapi.c @@ -3662,213 +3662,6 @@ int root_ioctl(const char *mdtname, int opc, void *data, int *mdtidxp, return rc; } -/****** Changelog API ********/ - -static int changelog_ioctl(const char *mdtname, int opc, int id, - long long recno, int flags) -{ - struct ioc_changelog data; - int *idx; - - data.icc_id = id; - data.icc_recno = recno; - data.icc_flags = flags; - idx = (int *)(&data.icc_mdtindex); - - return root_ioctl(mdtname, opc, &data, idx, WANT_ERROR); -} - -#define CHANGELOG_PRIV_MAGIC 0xCA8E1080 -struct changelog_private { - int magic; - enum changelog_send_flag flags; - struct lustre_kernelcomm kuc; -}; - -/** Start reading from a changelog - * @param priv Opaque private control structure - * @param flags Start flags (e.g. CHANGELOG_FLAG_BLOCK) - * @param device Report changes recorded on this MDT - * @param startrec Report changes beginning with this record number - * (just call llapi_changelog_fini when done; don't need an endrec) - */ -int llapi_changelog_start(void **priv, enum changelog_send_flag flags, - const char *device, long long startrec) -{ - struct changelog_private *cp; - static bool warned; - int rc; - - /* Set up the receiver control struct */ - cp = calloc(1, sizeof(*cp)); - if (cp == NULL) - return -ENOMEM; - - cp->magic = CHANGELOG_PRIV_MAGIC; - cp->flags = flags; - - /* Set up the receiver */ - rc = libcfs_ukuc_start(&cp->kuc, 0 /* no group registration */, 0); - if (rc < 0) - goto out_free; - - *priv = cp; - - /* CHANGELOG_FLAG_JOBID will eventually become mandatory. Display a - * warning if it's missing. */ - if (!(flags & CHANGELOG_FLAG_JOBID) && !warned) { - llapi_err_noerrno(LLAPI_MSG_WARN, "warning: %s() called " - "w/o CHANGELOG_FLAG_JOBID", __func__); - warned = true; - } - - /* Tell the kernel to start sending */ - rc = changelog_ioctl(device, OBD_IOC_CHANGELOG_SEND, cp->kuc.lk_wfd, - startrec, flags); - /* Only the kernel reference keeps the write side open */ - close(cp->kuc.lk_wfd); - cp->kuc.lk_wfd = LK_NOFD; - if (rc < 0) { - /* frees and clears priv */ - llapi_changelog_fini(priv); - return rc; - } - - return 0; - -out_free: - free(cp); - return rc; -} - -/** Finish reading from a changelog */ -int llapi_changelog_fini(void **priv) -{ - struct changelog_private *cp = (struct changelog_private *)*priv; - - if (!cp || (cp->magic != CHANGELOG_PRIV_MAGIC)) - return -EINVAL; - - libcfs_ukuc_stop(&cp->kuc); - free(cp); - *priv = NULL; - return 0; -} - -/** - * Convert all records to a same format according to the caller's wishes. - * Default is CLF_VERSION | CLF_RENAME. - * Add CLF_JOBID if explicitely requested. - * - * \param rec The record to remap. It is expected to be big enough to - * properly handle the final format. - * \return 1 if anything changed. 0 otherwise. - */ -/** Read the next changelog entry - * @param priv Opaque private control structure - * @param rech Changelog record handle; record will be allocated here - * @return 0 valid message received; rec is set - * <0 error code - * 1 EOF - */ -#define DEFAULT_RECORD_FMT (CLF_VERSION | CLF_RENAME) -int llapi_changelog_recv(void *priv, struct changelog_rec **rech) -{ - struct changelog_private *cp = (struct changelog_private *)priv; - struct kuc_hdr *kuch; - enum changelog_rec_flags rec_fmt = DEFAULT_RECORD_FMT; - int rc = 0; - - if (!cp || (cp->magic != CHANGELOG_PRIV_MAGIC)) - return -EINVAL; - if (rech == NULL) - return -EINVAL; - kuch = malloc(KUC_CHANGELOG_MSG_MAXSIZE); - if (kuch == NULL) - return -ENOMEM; - - if (cp->flags & CHANGELOG_FLAG_JOBID) - rec_fmt |= CLF_JOBID; - -repeat: - rc = libcfs_ukuc_msg_get(&cp->kuc, (char *)kuch, - KUC_CHANGELOG_MSG_MAXSIZE, - KUC_TRANSPORT_CHANGELOG); - if (rc < 0) - goto out_free; - - if ((kuch->kuc_transport != KUC_TRANSPORT_CHANGELOG) || - ((kuch->kuc_msgtype != CL_RECORD) && - (kuch->kuc_msgtype != CL_EOF))) { - llapi_err_noerrno(LLAPI_MSG_ERROR, - "Unknown changelog message type %d:%d\n", - kuch->kuc_transport, kuch->kuc_msgtype); - rc = -EPROTO; - goto out_free; - } - - if (kuch->kuc_msgtype == CL_EOF) { - if (cp->flags & CHANGELOG_FLAG_FOLLOW) { - /* Ignore EOFs */ - goto repeat; - } else { - rc = 1; - goto out_free; - } - } - - /* Our message is a changelog_rec. Use pointer math to skip - * kuch_hdr and point directly to the message payload. */ - *rech = (struct changelog_rec *)(kuch + 1); - changelog_remap_rec(*rech, rec_fmt); - - return 0; - -out_free: - *rech = NULL; - free(kuch); - return rc; -} - -/** Release the changelog record when done with it. */ -int llapi_changelog_free(struct changelog_rec **rech) -{ - if (*rech) { - /* We allocated memory starting at the kuc_hdr, but passed - * the consumer a pointer to the payload. - * Use pointer math to get back to the header. - */ - struct kuc_hdr *kuch = (struct kuc_hdr *)*rech - 1; - free(kuch); - } - *rech = NULL; - return 0; -} - -int llapi_changelog_clear(const char *mdtname, const char *idstr, - long long endrec) -{ - long id; - - if (endrec < 0) { - llapi_err_noerrno(LLAPI_MSG_ERROR, - "can't purge negative records\n"); - return -EINVAL; - } - - id = strtol(idstr + strlen(CHANGELOG_USER_PREFIX), NULL, 10); - if ((id == 0) || (strncmp(idstr, CHANGELOG_USER_PREFIX, - strlen(CHANGELOG_USER_PREFIX)) != 0)) { - llapi_err_noerrno(LLAPI_MSG_ERROR, - "expecting id of the form '" - CHANGELOG_USER_PREFIX - "'; got '%s'\n", idstr); - return -EINVAL; - } - - return changelog_ioctl(mdtname, OBD_IOC_CHANGELOG_CLEAR, id, endrec, 0); -} - int llapi_fid2path(const char *device, const char *fidstr, char *buf, int buflen, long long *recno, int *linkno) { diff --git a/lustre/utils/liblustreapi_chlg.c b/lustre/utils/liblustreapi_chlg.c new file mode 100644 index 0000000..f4661be --- /dev/null +++ b/lustre/utils/liblustreapi_chlg.c @@ -0,0 +1,311 @@ +/* + * LGPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * (C) Copyright 2017 Commissariat a l'energie atomique et aux energies + * alternatives + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the GNU Lesser General Public License + * (LGPL) version 2.1 or (at your discretion) any later version. + * (LGPL) version 2.1 accompanies this distribution, and is available at + * http://www.gnu.org/licenses/lgpl-2.1.html + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * LGPL HEADER END + */ +/* + * lustre/utils/liblustreapi_chlg.c + * + * lustreapi library for filesystem changelog + * + * Author: Henri Doreau + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + + +static int chlg_dev_path(char *path, size_t path_len, const char *device) +{ + int rc; + + rc = snprintf(path, path_len, "/dev/changelog-%s", device); + if (rc < 0) + return -EIO; + + if (rc >= path_len) + return -EOVERFLOW; + + return 0; +} + +#define CHANGELOG_PRIV_MAGIC 0xCA8E1080 +#define CHANGELOG_BUFFER_SZ 4096 + +/** + * Record state for efficient changelog consumption. + * Read chunks of CHANGELOG_BUFFER_SZ bytes. + */ +struct changelog_private { + /* Ensure that the structure is valid and initialized */ + int clp_magic; + /* File descriptor on the changelog character device */ + int clp_fd; + /* Changelog delivery mode */ + enum changelog_send_flag clp_send_flags; + /* Available bytes in buffer */ + size_t clp_buf_len; + /* Current position in buffer */ + char *clp_buf_pos; + /* Read buffer with records read from system */ + char clp_buf[0]; +}; + +/** + * Start reading from a changelog + * + * @param priv Opaque private control structure + * @param flags Start flags (e.g. CHANGELOG_FLAG_JOBID) + * @param device Report changes recorded on this MDT + * @param startrec Report changes beginning with this record number + * (just call llapi_changelog_fini when done; don't need an endrec) + */ +int llapi_changelog_start(void **priv, enum changelog_send_flag flags, + const char *device, long long startrec) +{ + struct changelog_private *cp; + static bool warned_jobid; + static bool warned_follow; + char cdev_path[PATH_MAX]; + int rc; + + rc = chlg_dev_path(cdev_path, sizeof(cdev_path), device); + if (rc != 0) + return rc; + + /* Set up the receiver control struct */ + cp = calloc(1, sizeof(*cp) + CHANGELOG_BUFFER_SZ); + if (cp == NULL) + return -ENOMEM; + + cp->clp_magic = CHANGELOG_PRIV_MAGIC; + cp->clp_send_flags = flags; + + cp->clp_buf_len = 0; + cp->clp_buf_pos = cp->clp_buf; + + /* Set up the receiver */ + cp->clp_fd = open(cdev_path, O_RDONLY); + if (cp->clp_fd < 0) { + rc = -errno; + goto out_free_cp; + } + + if (startrec != 0) { + rc = lseek(cp->clp_fd, startrec, SEEK_SET); + if (rc < 0) { + rc = -errno; + goto out_close; + } + } + + *priv = cp; + + /* CHANGELOG_FLAG_JOBID will eventually become mandatory. Display a + * warning if it's missing. */ + if (!(flags & CHANGELOG_FLAG_JOBID) && !warned_jobid) { + llapi_err_noerrno(LLAPI_MSG_WARN, "warning: %s() called " + "without CHANGELOG_FLAG_JOBID", __func__); + warned_jobid = true; + } + + /* Behavior expected by CHANGELOG_FLAG_FOLLOW is not implemented, warn + * the user and ignore it. */ + if (flags & CHANGELOG_FLAG_FOLLOW && !warned_follow) { + llapi_err_noerrno(LLAPI_MSG_WARN, "warning: %s() called with " + "CHANGELOG_FLAG_FOLLOW (ignored)", __func__); + warned_follow = true; + } + + return 0; + +out_close: + close(cp->clp_fd); +out_free_cp: + free(cp); + return rc; +} + +/** Finish reading from a changelog */ +int llapi_changelog_fini(void **priv) +{ + struct changelog_private *cp = *priv; + + if (!cp || (cp->clp_magic != CHANGELOG_PRIV_MAGIC)) + return -EINVAL; + + close(cp->clp_fd); + free(cp); + *priv = NULL; + return 0; +} + +static ssize_t chlg_read_bulk(struct changelog_private *cp) +{ + ssize_t rd_bytes; + + if (!cp || cp->clp_magic != CHANGELOG_PRIV_MAGIC) + return -EINVAL; + + rd_bytes = read(cp->clp_fd, cp->clp_buf, CHANGELOG_BUFFER_SZ); + if (rd_bytes < 0) + return -errno; + + cp->clp_buf_pos = cp->clp_buf; + cp->clp_buf_len = rd_bytes; + + return rd_bytes; +} + +/** + * Returns a file descriptor to poll on. + * + * \@param[in] priv Opaque changelog reader structure. + * @return valid file descriptor on success, negated errno code on failure. + */ +int llapi_changelog_get_fd(void *priv) +{ + struct changelog_private *cp = priv; + + if (!cp || cp->clp_magic != CHANGELOG_PRIV_MAGIC) + return -EINVAL; + + return cp->clp_fd; +} + +/** Read the next changelog entry + * @param priv Opaque private control structure + * @param rech Changelog record handle; record will be allocated here + * @return 0 valid message received; rec is set + * <0 error code + * 1 EOF + */ +#define DEFAULT_RECORD_FMT (CLF_VERSION | CLF_RENAME) +int llapi_changelog_recv(void *priv, struct changelog_rec **rech) +{ + struct changelog_private *cp = priv; + enum changelog_rec_flags rec_fmt = DEFAULT_RECORD_FMT; + struct changelog_rec *tmp; + int rc = 0; + + if (!cp || (cp->clp_magic != CHANGELOG_PRIV_MAGIC)) + return -EINVAL; + + if (rech == NULL) + return -EINVAL; + + *rech = malloc(CR_MAXSIZE); + if (*rech == NULL) + return -ENOMEM; + + if (cp->clp_send_flags & CHANGELOG_FLAG_JOBID) + rec_fmt |= CLF_JOBID; + + if (cp->clp_buf + cp->clp_buf_len <= cp->clp_buf_pos) { + ssize_t refresh; + + refresh = chlg_read_bulk(cp); + if (refresh == 0) { + /* EOF, CHANGELOG_FLAG_FOLLOW ignored for now LU-7659 */ + rc = 1; + goto out_free; + } else if (refresh < 0) { + rc = refresh; + goto out_free; + } + } + + /* TODO check changelog_rec_size */ + tmp = (struct changelog_rec *)cp->clp_buf_pos; + + memcpy(*rech, cp->clp_buf_pos, + changelog_rec_size(tmp) + tmp->cr_namelen); + + cp->clp_buf_pos += changelog_rec_size(tmp) + tmp->cr_namelen; + changelog_remap_rec(*rech, rec_fmt); + + return 0; + +out_free: + free(*rech); + *rech = NULL; + return rc; +} + +/** Release the changelog record when done with it. */ +int llapi_changelog_free(struct changelog_rec **rech) +{ + free(*rech); + *rech = NULL; + return 0; +} + +int llapi_changelog_clear(const char *mdtname, const char *idstr, + long long endrec) +{ + char dev_path[PATH_MAX]; + char cmd[64]; + size_t cmd_len = sizeof(cmd); + int fd; + int rc; + + if (endrec < 0) { + llapi_err_noerrno(LLAPI_MSG_ERROR, + "can't purge negative records\n"); + return -EINVAL; + } + + chlg_dev_path(dev_path, sizeof(dev_path), mdtname); + + rc = snprintf(cmd, cmd_len, "clear:%s:%lld", idstr, endrec); + if (rc >= sizeof(cmd)) + return -EINVAL; + + cmd_len = rc + 1; + + fd = open(dev_path, O_WRONLY); + if (fd < 0) { + rc = -errno; + llapi_error(LLAPI_MSG_ERROR, rc, "cannot open '%s'", dev_path); + return rc; + } + + rc = write(fd, cmd, cmd_len); + if (rc < 0) { + rc = -errno; + llapi_error(LLAPI_MSG_ERROR, rc, + "cannot purge records for '%s'", idstr); + goto out_close; + } + + rc = 0; + +out_close: + close(fd); + return rc; +} -- 1.8.3.1