Whamcloud - gitweb
LU-5095 hsm: Allow receiving messages to be non-blocking 93/10393/9
authorFrank Zago <fzago@cray.com>
Wed, 2 Jul 2014 16:35:18 +0000 (11:35 -0500)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 8 Jul 2014 15:31:10 +0000 (15:31 +0000)
When using the HSM API for a copy tool, it's impossible to
properly exit an application because one thread might be
stuck in llapi_hsm_copytool_recv(), which in turn blocks
forever on a read.

So this patch expands the existing Lustre userspace HSM API
to be able to apply flags to the read side of the
communication pipe, so it can be opened with O_NONBLOCK,
and gives access to the pipe file descriptor used. (read side)

So now a program can do a poll/select, and then call
llapi_hsm_copytool_recv without blocking.

Notes: The flags passed in from the copy tool to
llapi_hsm_copytool_register are applied to the read side of
the pipe using fcntl, because we do not want to apply
the same flags to the read and write sides of the pipe.

No functionality is added for setting flags on the write
side of the pipe, since the kernel expects particular
behavior from that side of the pipe, and it seems unwise
to let user space modify it.

Additionally, this patch changes llapi_hsm_copytool_recv to
not return -EAGAIN when it finds a message for an archive
not serviced by that copytool.

Instead of returning -EAGAIN to the copy tool, which just
restarts the receive operation, this patch just loops inside
the recv function.

This is because this -EAGAIN conflicts with -EAGAIN
returned from the call to libcfs_ukuc_msg_get
which can occur when O_NONBLOCK is set.

Signed-off-by: Patrick Farrell <paf@cray.com>
Change-Id: I3e611d3c259de9a8c30f3939fb5e48ab88210c2e
Reviewed-on: http://review.whamcloud.com/10393
Tested-by: Jenkins
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Henri Doreau <henri.doreau@cea.fr>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
libcfs/include/libcfs/libcfs_kernelcomm.h
libcfs/libcfs/kernel_user_comm.c
lustre/include/lustre/lustreapi.h
lustre/utils/lhsmtool_posix.c
lustre/utils/liblustreapi.c
lustre/utils/liblustreapi_hsm.c

index ff1002f..0de5cd1 100644 (file)
@@ -112,8 +112,9 @@ typedef struct lustre_kernelcomm {
 } __attribute__((packed)) lustre_kernelcomm;
 
 /* Userspace methods */
 } __attribute__((packed)) lustre_kernelcomm;
 
 /* Userspace methods */
-extern int libcfs_ukuc_start(lustre_kernelcomm *l, int groups);
+extern int libcfs_ukuc_start(lustre_kernelcomm *l, int groups, int rfd_flags);
 extern int libcfs_ukuc_stop(lustre_kernelcomm *l);
 extern int libcfs_ukuc_stop(lustre_kernelcomm *l);
+int libcfs_ukuc_get_rfd(lustre_kernelcomm *link);
 extern int libcfs_ukuc_msg_get(lustre_kernelcomm *l, char *buf, int maxsize,
                                int transport);
 
 extern int libcfs_ukuc_msg_get(lustre_kernelcomm *l, char *buf, int maxsize,
                                int transport);
 
index fc92e2f..1c5e0ee 100644 (file)
  * @param link Private descriptor for pipe/socket.
  * @param groups KUC broadcast group to listen to
  *          (can be null for unicast to this pid)
  * @param link Private descriptor for pipe/socket.
  * @param groups KUC broadcast group to listen to
  *          (can be null for unicast to this pid)
+ * @param rfd_flags flags for read side of pipe (e.g. O_NONBLOCK)
  */
  */
-int libcfs_ukuc_start(lustre_kernelcomm *link, int group)
+int libcfs_ukuc_start(lustre_kernelcomm *link, int group, int rfd_flags)
 {
        int pfd[2];
 {
        int pfd[2];
+       int rc;
 
        link->lk_rfd = link->lk_wfd = LK_NOFD;
 
        if (pipe(pfd) < 0)
                return -errno;
 
 
        link->lk_rfd = link->lk_wfd = LK_NOFD;
 
        if (pipe(pfd) < 0)
                return -errno;
 
+       if (fcntl(pfd[0], F_SETFL, rfd_flags) < 0) {
+               rc = -errno;
+               close(pfd[0]);
+               close(pfd[1]);
+               return rc;
+       }
+
        memset(link, 0, sizeof(*link));
        link->lk_rfd = pfd[0];
        link->lk_wfd = pfd[1];
        memset(link, 0, sizeof(*link));
        link->lk_rfd = pfd[0];
        link->lk_wfd = pfd[1];
@@ -80,6 +89,15 @@ int libcfs_ukuc_stop(lustre_kernelcomm *link)
        return rc;
 }
 
        return rc;
 }
 
+/** Returns the file descriptor for the read side of the pipe,
+ *  to be used with poll/select.
+ * @param link Private descriptor for pipe/socket.
+ */
+int libcfs_ukuc_get_rfd(lustre_kernelcomm *link)
+{
+       return link->lk_rfd;
+}
+
 #define lhsz sizeof(*kuch)
 
 /** Read a message from the link.
 #define lhsz sizeof(*kuch)
 
 /** Read a message from the link.
index 54b9d1c..38244a2 100644 (file)
@@ -328,9 +328,10 @@ struct hsm_copytool_private;
 struct hsm_copyaction_private;
 
 extern int llapi_hsm_copytool_register(struct hsm_copytool_private **priv,
 struct hsm_copyaction_private;
 
 extern int llapi_hsm_copytool_register(struct hsm_copytool_private **priv,
-                                      const char *mnt, int flags,
-                                      int archive_count, int *archives);
+                                      const char *mnt, int archive_count,
+                                      int *archives, int rfd_flags);
 extern int llapi_hsm_copytool_unregister(struct hsm_copytool_private **priv);
 extern int llapi_hsm_copytool_unregister(struct hsm_copytool_private **priv);
+extern int llapi_hsm_copytool_get_fd(struct hsm_copytool_private *ct);
 extern int llapi_hsm_copytool_recv(struct hsm_copytool_private *priv,
                                   struct hsm_action_list **hal, int *msgsize);
 extern int llapi_hsm_action_begin(struct hsm_copyaction_private **phcp,
 extern int llapi_hsm_copytool_recv(struct hsm_copytool_private *priv,
                                   struct hsm_action_list **hal, int *msgsize);
 extern int llapi_hsm_action_begin(struct hsm_copyaction_private **phcp,
index 2e18351..ff157dc 100644 (file)
@@ -1763,8 +1763,9 @@ static int ct_run(void)
                llapi_error_callback_set(llapi_hsm_log_error);
        }
 
                llapi_error_callback_set(llapi_hsm_log_error);
        }
 
-       rc = llapi_hsm_copytool_register(&ctdata, opt.o_mnt, 0,
-                                        opt.o_archive_cnt, opt.o_archive_id);
+       rc = llapi_hsm_copytool_register(&ctdata, opt.o_mnt,
+                                        opt.o_archive_cnt,
+                                        opt.o_archive_id, 0);
        if (rc < 0) {
                CT_ERROR(rc, "cannot start copytool interface");
                return rc;
        if (rc < 0) {
                CT_ERROR(rc, "cannot start copytool interface");
                return rc;
@@ -1785,8 +1786,6 @@ static int ct_run(void)
                if (rc == -ESHUTDOWN) {
                        CT_TRACE("shutting down");
                        break;
                if (rc == -ESHUTDOWN) {
                        CT_TRACE("shutting down");
                        break;
-               } else if (rc == -EAGAIN) {
-                       continue; /* msg not for us */
                } else if (rc < 0) {
                        CT_WARN("cannot receive action list: %s",
                                strerror(-rc));
                } else if (rc < 0) {
                        CT_WARN("cannot receive action list: %s",
                                strerror(-rc));
index 8ef0fb6..f7f8e69 100644 (file)
@@ -4130,7 +4130,7 @@ int llapi_changelog_start(void **priv, int flags, const char *device,
         cp->flags = flags;
 
         /* Set up the receiver */
         cp->flags = flags;
 
         /* Set up the receiver */
-        rc = libcfs_ukuc_start(&cp->kuc, 0 /* no group registration */);
+       rc = libcfs_ukuc_start(&cp->kuc, 0 /* no group registration */, 0);
         if (rc < 0)
                 goto out_free;
 
         if (rc < 0)
                 goto out_free;
 
index c46435a..e47a7d9 100644 (file)
@@ -641,13 +641,13 @@ out_free:
 /** Register a copytool
  * \param[out] priv Opaque private control structure
  * \param mnt Lustre filesystem mount point
 /** Register a copytool
  * \param[out] priv Opaque private control structure
  * \param mnt Lustre filesystem mount point
- * \param flags Open flags, currently unused (e.g. O_NONBLOCK)
  * \param archive_count
  * \param archives Which archive numbers this copytool is responsible for
  * \param archive_count
  * \param archives Which archive numbers this copytool is responsible for
+ * \param rfd_flags flags applied to read fd of pipe (e.g. O_NONBLOCK)
  */
 int llapi_hsm_copytool_register(struct hsm_copytool_private **priv,
  */
 int llapi_hsm_copytool_register(struct hsm_copytool_private **priv,
-                               const char *mnt, int flags, int archive_count,
-                               int *archives)
+                               const char *mnt, int archive_count,
+                               int *archives, int rfd_flags)
 {
        struct hsm_copytool_private     *ct;
        int                              rc;
 {
        struct hsm_copytool_private     *ct;
        int                              rc;
@@ -712,7 +712,7 @@ int llapi_hsm_copytool_register(struct hsm_copytool_private **priv,
                ct->archives |= (1 << (archives[rc] - 1));
        }
 
                ct->archives |= (1 << (archives[rc] - 1));
        }
 
-       rc = libcfs_ukuc_start(&ct->kuc, KUC_GRP_HSM);
+       rc = libcfs_ukuc_start(&ct->kuc, KUC_GRP_HSM, rfd_flags);
        if (rc < 0)
                goto out_err;
 
        if (rc < 0)
                goto out_err;
 
@@ -794,6 +794,19 @@ int llapi_hsm_copytool_unregister(struct hsm_copytool_private **priv)
        return 0;
 }
 
        return 0;
 }
 
+/** Returns a file descriptor to poll/select on.
+ * \param ct Opaque private control structure
+ * \retval -EINVAL on error
+ * \retval the file descriptor for reading HSM events from the kernel
+ */
+int llapi_hsm_copytool_get_fd(struct hsm_copytool_private *ct)
+{
+       if (ct == NULL || ct->magic != CT_PRIV_MAGIC)
+               return -EINVAL;
+
+       return libcfs_ukuc_get_rfd(&ct->kuc);
+}
+
 /** Wait for the next hsm_action_list
  * \param ct Opaque private control structure
  * \param halh Action list handle, will be allocated here
 /** Wait for the next hsm_action_list
  * \param ct Opaque private control structure
  * \param halh Action list handle, will be allocated here
@@ -818,6 +831,7 @@ int llapi_hsm_copytool_recv(struct hsm_copytool_private *ct,
 
        kuch = ct->kuch;
 
 
        kuch = ct->kuch;
 
+repeat:
        rc = libcfs_ukuc_msg_get(&ct->kuc, (char *)kuch,
                                 HAL_MAXSIZE + sizeof(*kuch),
                                 KUC_TRANSPORT_HSM);
        rc = libcfs_ukuc_msg_get(&ct->kuc, (char *)kuch,
                                 HAL_MAXSIZE + sizeof(*kuch),
                                 KUC_TRANSPORT_HSM);
@@ -861,9 +875,8 @@ int llapi_hsm_copytool_recv(struct hsm_copytool_private *ct,
                                  " ignoring this request."
                                  " Mask of served archive is 0x%.8X",
                                  hal->hal_archive_id, ct->archives);
                                  " ignoring this request."
                                  " Mask of served archive is 0x%.8X",
                                  hal->hal_archive_id, ct->archives);
-               rc = -EAGAIN;
 
 
-               goto out_err;
+               goto repeat;
        }
 
        *halh = hal;
        }
 
        *halh = hal;