From: Frank Zago Date: Wed, 2 Jul 2014 16:35:18 +0000 (-0500) Subject: LU-5095 hsm: Allow receiving messages to be non-blocking X-Git-Tag: 2.6.0-RC1~20 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=refs%2Fchanges%2F93%2F10393%2F9;ds=sidebyside LU-5095 hsm: Allow receiving messages to be non-blocking When using the HSM API for a copy tool, it's impossible to properly exit an application because one thread might be stuck in llapi_hsm_copytool_recv(), which in turn blocks forever on a read. So this patch expands the existing Lustre userspace HSM API to be able to apply flags to the read side of the communication pipe, so it can be opened with O_NONBLOCK, and gives access to the pipe file descriptor used. (read side) So now a program can do a poll/select, and then call llapi_hsm_copytool_recv without blocking. Notes: The flags passed in from the copy tool to llapi_hsm_copytool_register are applied to the read side of the pipe using fcntl, because we do not want to apply the same flags to the read and write sides of the pipe. No functionality is added for setting flags on the write side of the pipe, since the kernel expects particular behavior from that side of the pipe, and it seems unwise to let user space modify it. Additionally, this patch changes llapi_hsm_copytool_recv to not return -EAGAIN when it finds a message for an archive not serviced by that copytool. Instead of returning -EAGAIN to the copy tool, which just restarts the receive operation, this patch just loops inside the recv function. This is because this -EAGAIN conflicts with -EAGAIN returned from the call to libcfs_ukuc_msg_get which can occur when O_NONBLOCK is set. Signed-off-by: Patrick Farrell Change-Id: I3e611d3c259de9a8c30f3939fb5e48ab88210c2e Reviewed-on: http://review.whamcloud.com/10393 Tested-by: Jenkins Reviewed-by: John L. Hammond Tested-by: Maloo Reviewed-by: Henri Doreau Reviewed-by: Oleg Drokin --- diff --git a/libcfs/include/libcfs/libcfs_kernelcomm.h b/libcfs/include/libcfs/libcfs_kernelcomm.h index ff1002f..0de5cd1 100644 --- a/libcfs/include/libcfs/libcfs_kernelcomm.h +++ b/libcfs/include/libcfs/libcfs_kernelcomm.h @@ -112,8 +112,9 @@ typedef struct lustre_kernelcomm { } __attribute__((packed)) lustre_kernelcomm; /* Userspace methods */ -extern int libcfs_ukuc_start(lustre_kernelcomm *l, int groups); +extern int libcfs_ukuc_start(lustre_kernelcomm *l, int groups, int rfd_flags); extern int libcfs_ukuc_stop(lustre_kernelcomm *l); +int libcfs_ukuc_get_rfd(lustre_kernelcomm *link); extern int libcfs_ukuc_msg_get(lustre_kernelcomm *l, char *buf, int maxsize, int transport); diff --git a/libcfs/libcfs/kernel_user_comm.c b/libcfs/libcfs/kernel_user_comm.c index fc92e2f..1c5e0ee 100644 --- a/libcfs/libcfs/kernel_user_comm.c +++ b/libcfs/libcfs/kernel_user_comm.c @@ -51,16 +51,25 @@ * @param link Private descriptor for pipe/socket. * @param groups KUC broadcast group to listen to * (can be null for unicast to this pid) + * @param rfd_flags flags for read side of pipe (e.g. O_NONBLOCK) */ -int libcfs_ukuc_start(lustre_kernelcomm *link, int group) +int libcfs_ukuc_start(lustre_kernelcomm *link, int group, int rfd_flags) { int pfd[2]; + int rc; link->lk_rfd = link->lk_wfd = LK_NOFD; if (pipe(pfd) < 0) return -errno; + if (fcntl(pfd[0], F_SETFL, rfd_flags) < 0) { + rc = -errno; + close(pfd[0]); + close(pfd[1]); + return rc; + } + memset(link, 0, sizeof(*link)); link->lk_rfd = pfd[0]; link->lk_wfd = pfd[1]; @@ -80,6 +89,15 @@ int libcfs_ukuc_stop(lustre_kernelcomm *link) return rc; } +/** Returns the file descriptor for the read side of the pipe, + * to be used with poll/select. + * @param link Private descriptor for pipe/socket. + */ +int libcfs_ukuc_get_rfd(lustre_kernelcomm *link) +{ + return link->lk_rfd; +} + #define lhsz sizeof(*kuch) /** Read a message from the link. diff --git a/lustre/include/lustre/lustreapi.h b/lustre/include/lustre/lustreapi.h index 54b9d1c..38244a2 100644 --- a/lustre/include/lustre/lustreapi.h +++ b/lustre/include/lustre/lustreapi.h @@ -328,9 +328,10 @@ struct hsm_copytool_private; struct hsm_copyaction_private; extern int llapi_hsm_copytool_register(struct hsm_copytool_private **priv, - const char *mnt, int flags, - int archive_count, int *archives); + const char *mnt, int archive_count, + int *archives, int rfd_flags); extern int llapi_hsm_copytool_unregister(struct hsm_copytool_private **priv); +extern int llapi_hsm_copytool_get_fd(struct hsm_copytool_private *ct); extern int llapi_hsm_copytool_recv(struct hsm_copytool_private *priv, struct hsm_action_list **hal, int *msgsize); extern int llapi_hsm_action_begin(struct hsm_copyaction_private **phcp, diff --git a/lustre/utils/lhsmtool_posix.c b/lustre/utils/lhsmtool_posix.c index 2e18351..ff157dc 100644 --- a/lustre/utils/lhsmtool_posix.c +++ b/lustre/utils/lhsmtool_posix.c @@ -1763,8 +1763,9 @@ static int ct_run(void) llapi_error_callback_set(llapi_hsm_log_error); } - rc = llapi_hsm_copytool_register(&ctdata, opt.o_mnt, 0, - opt.o_archive_cnt, opt.o_archive_id); + rc = llapi_hsm_copytool_register(&ctdata, opt.o_mnt, + opt.o_archive_cnt, + opt.o_archive_id, 0); if (rc < 0) { CT_ERROR(rc, "cannot start copytool interface"); return rc; @@ -1785,8 +1786,6 @@ static int ct_run(void) if (rc == -ESHUTDOWN) { CT_TRACE("shutting down"); break; - } else if (rc == -EAGAIN) { - continue; /* msg not for us */ } else if (rc < 0) { CT_WARN("cannot receive action list: %s", strerror(-rc)); diff --git a/lustre/utils/liblustreapi.c b/lustre/utils/liblustreapi.c index 8ef0fb6..f7f8e69 100644 --- a/lustre/utils/liblustreapi.c +++ b/lustre/utils/liblustreapi.c @@ -4130,7 +4130,7 @@ int llapi_changelog_start(void **priv, int flags, const char *device, cp->flags = flags; /* Set up the receiver */ - rc = libcfs_ukuc_start(&cp->kuc, 0 /* no group registration */); + rc = libcfs_ukuc_start(&cp->kuc, 0 /* no group registration */, 0); if (rc < 0) goto out_free; diff --git a/lustre/utils/liblustreapi_hsm.c b/lustre/utils/liblustreapi_hsm.c index c46435a..e47a7d9 100644 --- a/lustre/utils/liblustreapi_hsm.c +++ b/lustre/utils/liblustreapi_hsm.c @@ -641,13 +641,13 @@ out_free: /** Register a copytool * \param[out] priv Opaque private control structure * \param mnt Lustre filesystem mount point - * \param flags Open flags, currently unused (e.g. O_NONBLOCK) * \param archive_count * \param archives Which archive numbers this copytool is responsible for + * \param rfd_flags flags applied to read fd of pipe (e.g. O_NONBLOCK) */ int llapi_hsm_copytool_register(struct hsm_copytool_private **priv, - const char *mnt, int flags, int archive_count, - int *archives) + const char *mnt, int archive_count, + int *archives, int rfd_flags) { struct hsm_copytool_private *ct; int rc; @@ -712,7 +712,7 @@ int llapi_hsm_copytool_register(struct hsm_copytool_private **priv, ct->archives |= (1 << (archives[rc] - 1)); } - rc = libcfs_ukuc_start(&ct->kuc, KUC_GRP_HSM); + rc = libcfs_ukuc_start(&ct->kuc, KUC_GRP_HSM, rfd_flags); if (rc < 0) goto out_err; @@ -794,6 +794,19 @@ int llapi_hsm_copytool_unregister(struct hsm_copytool_private **priv) return 0; } +/** Returns a file descriptor to poll/select on. + * \param ct Opaque private control structure + * \retval -EINVAL on error + * \retval the file descriptor for reading HSM events from the kernel + */ +int llapi_hsm_copytool_get_fd(struct hsm_copytool_private *ct) +{ + if (ct == NULL || ct->magic != CT_PRIV_MAGIC) + return -EINVAL; + + return libcfs_ukuc_get_rfd(&ct->kuc); +} + /** Wait for the next hsm_action_list * \param ct Opaque private control structure * \param halh Action list handle, will be allocated here @@ -818,6 +831,7 @@ int llapi_hsm_copytool_recv(struct hsm_copytool_private *ct, kuch = ct->kuch; +repeat: rc = libcfs_ukuc_msg_get(&ct->kuc, (char *)kuch, HAL_MAXSIZE + sizeof(*kuch), KUC_TRANSPORT_HSM); @@ -861,9 +875,8 @@ int llapi_hsm_copytool_recv(struct hsm_copytool_private *ct, " ignoring this request." " Mask of served archive is 0x%.8X", hal->hal_archive_id, ct->archives); - rc = -EAGAIN; - goto out_err; + goto repeat; } *halh = hal;