Whamcloud - gitweb
LU-12373 pcc: uncache the pcc copies when remove a PCC backend
authorQian Yingjin <qian@ddn.com>
Fri, 14 Jun 2019 09:29:55 +0000 (05:29 -0400)
committerAndreas Dilger <adilger@whamcloud.com>
Thu, 25 Mar 2021 14:14:29 +0000 (14:14 +0000)
Currently when remove a PCC backend from a client, it does not
make any special handling for previously cached files at all.
Users can still use PCC caching service for these files. This
may not what users want. The reason is as follows:

1) For RW-PCC cached files, it does not restore the data back
into Lustre OSTs of the main filesystem. Although the PCC
backend falls back as a tranditional HSM storage solution
since the lhsmtool_posix copytool is still running at this
client. But this is dangerous, and likly to cause user data
to be lost if the PCC device may be permanently unavailable.

2) The space used by these PCC cached files may not released.

In this patch, when remove a PCC backend from a client, the
default action is to scan the PCC backend fs, uncache
(detach and remove) the PCC copy from PCC by FID.

We also add an option "--keep|-k" for PCC backend removal.
It behaves as before, just remove the PCC backend, but
retain the data on the cache.

This patch also introduces a common library to scan the HSM
backend.

Test-Parameters: clientcount=3 testlist=sanity-pcc,sanity-pcc,sanity-pcc
Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: Ib4db36137c025fd78c7022c8b8c39b63e3b9ad4d
Reviewed-on: https://review.whamcloud.com/41919
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Li Xi <lixi@ddn.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
18 files changed:
lustre/doc/Makefile.am
lustre/doc/lctl-pcc.8
lustre/doc/llapi_pcc_clear.3 [new file with mode: 0644]
lustre/doc/llapi_pcc_del.3 [new file with mode: 0644]
lustre/doc/llapi_pcc_detach_fid_fd.3
lustre/include/lustre/lustreapi.h
lustre/include/uapi/linux/lustre/lustre_user.h
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/pcc.c
lustre/llite/pcc.h
lustre/tests/sanity-pcc.sh
lustre/utils/Makefile.am
lustre/utils/lfs.c
lustre/utils/libhsm_scanner.c [new file with mode: 0644]
lustre/utils/libhsm_scanner.h [new file with mode: 0644]
lustre/utils/liblustreapi_pcc.c
lustre/utils/obd.c

index 24bf4ee..436ec1f 100644 (file)
@@ -174,6 +174,8 @@ LIBMAN =                                    \
        llapi_pccdev_set.3                      \
        llapi_pcc_state_get.3                   \
        llapi_pcc_state_get_fd.3                \
+       llapi_pcc_clear.3                       \
+       llapi_pcc_del.3                         \
        llapi_quotactl.3                        \
        llapi_rmfid.3                           \
        llapi_search_mdt.3                      \
index 759a670..50045bb 100644 (file)
@@ -4,9 +4,9 @@ lctl pcc commands used to interact with PCC features.
 .SH SYNOPSIS
 .B lctl pcc add \fR<\fImntpath\fR> <\fIpccpath\fR> [\fB--param\fR|\fB-p\fR <\fIparam\fR>]
 .br
-.B lctl pcc del <\fImntpath\fR> <\fIpccpath\fR>
+.B lctl pcc del [\fB--keep\fR|\fB-k\fR] <\fImntpath\fR> <\fIpccpath\fR>
 .br
-.B lctl pcc clear <\fImntpath\fR>
+.B lctl pcc clear [\fB--keep\fR|\fB-k\fR] <\fImntpath\fR>
 .br
 .B lctl pcc list <\fImntpath\fR>
 .SH DESCRIPTION
@@ -28,13 +28,13 @@ and the suffix of the file name is "h5". "rwid" represents the read-write
 attach id (2) which value is same as the archive ID of the copytool agent
 running on this PCC node.
 .TP
-.B lctl pcc del <\fImntpath\fR> <\fIpccpath\fR>
+.B lctl pcc del [\fB--keep\fR|\fB-k\fR] <\fImntpath\fR> <\fIpccpath\fR>
 Delete a PCC backend specified by path
 .IR pccpath
 on a Lustre client referenced by the mount point of
 .IR mntpath .
 .TP
-.B lctl pcc clear <\fImntpath\fR>
+.B lctl pcc clear [\fB--keep\fR|\fB-k\fR] <\fImntpath\fR>
 Remove all PCC backend on a Lustre client referenced by the mount point of
 .IR mntpath .
 .TP
@@ -46,6 +46,14 @@ List all PCC backends on a Lustre client referenced by the mount point of
 .B --param | -p
 Specifies the configuration parameters for a PCC backend.
 .TP
+.B --keep | -k
+By default, when remove a PCC backend from a client, the action is to scan the
+PCC backend fs, uncache (detach and remove) all scanned PCC copies from PCC by
+FIDs. For this option "--keep|-k", it just removes the PCC backend from the
+Lustre client, and retains the data on the cache. At this time, the PCC-RW
+backend falls back as a tranditional HSM storage solution since the copytool is
+still running at this client.
+.TP
 .SH SEE ALSO
 .BR lfs (1),
 .BR lfs-hsm (1),
diff --git a/lustre/doc/llapi_pcc_clear.3 b/lustre/doc/llapi_pcc_clear.3
new file mode 100644 (file)
index 0000000..e8ca12e
--- /dev/null
@@ -0,0 +1,39 @@
+.TH llapi_pcc_clear 3 "2019 June 20" "Lustre User API"
+.SH NAME
+llapi_pcc_clear \- Remove all PCC backends from a client
+.SH SYNOPSIS
+.nf
+.B #include <lustre/lustreapi.h>
+.PP
+.BI "int llapi_pcc_clear(const char *" mntpath ", bool " keep_data );
+.fi
+.SH DESCRIPTION
+.PP
+The function
+.BR llapi_pcc_clear()
+deletes all PCC backends on the client with the mount point referenced by
+.IR mntpath .
+Reference
+.BR llapi_pcc_del()
+for the usage of the input parameter
+.IR keep_data .
+.SH RETURN VALUES
+.PP
+.B llapi_pcc_clear()
+returns 0 on success or a negative errno value on failure.
+.SH ERRORS
+.TP 15
+.SM -ENOMEM
+Insufficient memory to complete operation.
+.TP
+.SM -EFAULT
+Memory region is not properly mapped.
+.TP
+.SM -EINVAL
+One or more invalid arguments are given.
+.TP
+.SM -EOPNOTSUPP
+PCC backend operation is not supported.
+.SH "SEE ALSO"
+.BR llapi_pcc_del (3),
+.BR lustreapi (7)
diff --git a/lustre/doc/llapi_pcc_del.3 b/lustre/doc/llapi_pcc_del.3
new file mode 100644 (file)
index 0000000..23cb577
--- /dev/null
@@ -0,0 +1,45 @@
+.TH llapi_pcc_del 3 "2019 June 20" "Lustre User API"
+.SH NAME
+llapi_pcc_del \- Delete a PCC backend from a client
+.SH SYNOPSIS
+.nf
+.B #include <lustre/lustreapi.h>
+.PP
+.BI "int llapi_pcc_del(const char *" mntpath ", const char *" pccpath ",
+.BI "                  __u32 " flags );
+.fi
+.SH DESCRIPTION
+.PP
+The function
+.BR llapi_pcc_del()
+deletes a PCC backend referenced by
+.IR pccpath
+on the client with the mount point of
+.IR mntpath .
+By default, when remove a PCC backend from a client, the action is to scan the
+PCC backend fs, uncache (detach and remove) all scanned PCC copies from PCC by
+FIDs. The input parameter
+.IR keep_data
+means that it just removes the PCC backend from the Lustre client, and retains
+the data on the cache. In this case, the PCC-RW backend falls back as a
+tranditional HSM storage solution as long as the copytool is still running at
+this client.
+.SH RETURN VALUES
+.PP
+.B llapi_pcc_del()
+returns 0 on success or a negative errno value on failure.
+.SH ERRORS
+.TP 15
+.SM -ENOMEM
+Insufficient memory to complete operation.
+.TP
+.SM -EFAULT
+Memory region is not properly mapped.
+.TP
+.SM -EINVAL
+One or more invalid arguments are given.
+.TP
+.SM -EOPNOTSUPP
+PCC backend operation is not supported.
+.SH "SEE ALSO"
+.BR lustreapi (7)
index 0afe243..ba49ff7 100644 (file)
@@ -6,13 +6,16 @@ llapi_pcc_detach_file \- detach the given file from PCC
 .nf
 .B #include <lustre/lustreapi.h>
 .PP
-.BI "int llapi_pcc_detach_fid_fd(int " dirfd ", const struct lu_fid *" fid ");"
+.BI "int llapi_pcc_detach_fid_fd(int " dirfd ", const struct lu_fid *" fid ",
+.BI "                            __u32 " flags );
 .PP
-.BI "int llapi_pcc_detach_fid(const char *" mntpath ", const struct lu_fid *" fid ");"
+.BI "int llapi_pcc_detach_fid(const char *" mntpath ",
+.BI "                         const struct lu_fid *" fid ", __u32 " flags );
 .PP
-.BI "int llapi_pcc_detach_fid_str(const char *" mntpath ", const char *" fidstr ");"
+.BI "int llapi_pcc_detach_fid_str(const char *" mntpath ",
+.BI "                             const char *" fidstr ", __u32 " flags );
 .PP
-.BI "int llapi_pcc_detach_file(const char *" path ");"
+.BI "int llapi_pcc_detach_file(const char *" path ", __u32 " flags );
 .fi
 .SH DESCRIPTION
 .PP
@@ -21,7 +24,8 @@ llapi_pcc_detach_file \- detach the given file from PCC
 .BR llapi_pcc_detach_fid_str() ,
 and
 .BR llapi_pcc_detach_file()
-detaches a cached file from PCC by an ioctl on the dir. The file is referenced
+detach a cached file from PCC by an ioctl on the dir or the file itself. The
+file is referenced
 by
 .IR fid ,
 .IR fidstr ,
@@ -32,6 +36,35 @@ is referenced by
 .IR dirfd ,
 .IR mntpath ,
 .IR path .
+The detach flags is specified by
+.IR flags
+argument, which is a
+.B enum lu_pcc_detach_flags
+data structure, which contains the following values:
+.nf
+.LP
+       PCC_DETACH_FL_NONE                      = 0x0,
+       PCC_DETACH_FL_UNCACHE           = 0x01,
+       PCC_DETACH_FL_KNOWN_READWRITE   = 0x02,
+       PCC_DETACH_FL_KNOWN_READONLY    = 0x04,
+       PCC_DETACH_FL_CACHE_REMOVED     = 0x08,
+.fi
+.TP
+PCC_DETACH_FL_NONE
+means that detach only and retain the PCC copy.
+.TP
+PCC_DETACH_FL_UNCACHE
+means that remove the PCC copy after detach.
+.TP
+PCC_DETACH_KNOWN_READWRITE
+means that known the file was once used as PCC-RW.
+.TP
+PCC_DETACH_KNOWN_READONLY
+means that known the file was once used as PCC-RO.
+.TP
+PCC_DETACH_FL_CACHE_REMOVED
+indicates that PCC cached copy is removed. It is used to tell the user space
+caller that the file is detached and the corresponding PCC copy is removed.
 .SH RETURN VALUES
 .LP
 .BR llapi_pcc_detach_fid_fd() ,
index 9698497..49cc02a 100644 (file)
@@ -621,16 +621,18 @@ int llapi_pcc_attach_fid(const char *mntpath, const struct lu_fid *fid,
                         __u32 id, enum lu_pcc_type type);
 int llapi_pcc_attach_fid_str(const char *mntpath, const char *fidstr,
                             __u32 id, enum lu_pcc_type type);
-int llapi_pcc_detach_fd(int fd, __u32 option);
+int llapi_pcc_detach_fd(int fd, __u32 flags);
 int llapi_pcc_detach_fid(const char *mntpath, const struct lu_fid *fid,
-                        __u32 option);
+                        __u32 flags);
 int llapi_pcc_detach_fid_str(const char *mntpath, const char *fidstr,
-                            __u32 option);
-int llapi_pcc_detach_file(const char *path, __u32 option);
+                            __u32 flags);
+int llapi_pcc_detach_file(const char *path, __u32 flags);
 int llapi_pcc_state_get_fd(int fd, struct lu_pcc_state *state);
 int llapi_pcc_state_get(const char *path, struct lu_pcc_state *state);
 int llapi_pccdev_set(const char *mntpath, const char *cmd);
 int llapi_pccdev_get(const char *mntpath);
+int llapi_pcc_del(const char *mntpath, const char *pccpath, __u32 flags);
+int llapi_pcc_clear(const char *mntpath, __u32 flags);
 /** @} llapi */
 
 /* llapi_layout user interface */
index 7ee2355..3eaf19a 100644 (file)
@@ -2665,24 +2665,60 @@ static inline const char *pcc_type2string(enum lu_pcc_type type)
        }
 }
 
+#define PCC_YAML_PCCPATH       "pccpath"
+#define PCC_YAML_HSMTOOL       "hsmtool"
+#define PCC_YAML_RWID          "rwid"
+#define PCC_YAML_ROID          "roid"
+#define PCC_YAML_FLAGS         "flags"
+#define PCC_YAML_AUTOCACHE     "autocache"
+
+enum hsmtool_type {
+       HSMTOOL_UNKNOWN = 0,
+       HSMTOOL_POSIX   = 1,
+};
+
+static inline const char *hsmtool_type2string(enum hsmtool_type type)
+{
+       switch (type) {
+       case HSMTOOL_POSIX:
+               return "posix";
+       default:
+               return "unknown";
+       }
+}
+
+static inline enum hsmtool_type hsmtool_string2type(const char *str)
+{
+       if (strcmp(str, "posix") == 0)
+               return HSMTOOL_POSIX;
+
+       return HSMTOOL_UNKNOWN;
+}
+
 struct lu_pcc_attach {
        __u32 pcca_type; /* PCC type */
        __u32 pcca_id; /* Attach ID */
 };
 
-enum lu_pcc_detach_opts {
-       PCC_DETACH_OPT_NONE = 0, /* Detach only, keep the PCC copy */
-       PCC_DETACH_OPT_UNCACHE, /* Remove the cached file after detach */
+enum lu_pcc_detach_flags {
+       /* Detach only, keep the PCC copy */
+       PCC_DETACH_FL_NONE              = 0x0,
+       /* Remove the cached file after detach */
+       PCC_DETACH_FL_UNCACHE           = 0x01,
+       /* Known the file was once used as PCC-RW */
+       PCC_DETACH_FL_KNOWN_READWRITE   = 0x02,
+       /* Known the file was once used as PCC-RO */
+       PCC_DETACH_FL_KNOWN_READONLY    = 0x04,
 };
 
 struct lu_pcc_detach_fid {
        /* fid of the file to detach */
        struct lu_fid   pccd_fid;
-       __u32           pccd_opt;
+       __u32           pccd_flags;
 };
 
 struct lu_pcc_detach {
-       __u32           pccd_opt;
+       __u32           pccd_flags;
 };
 
 enum lu_pcc_state_flags {
@@ -2701,6 +2737,12 @@ struct lu_pcc_state {
        char    pccs_path[PATH_MAX];
 };
 
+enum lu_pcc_cleanup_flags {
+       PCC_CLEANUP_FL_NONE             = 0x0,
+       /* Remove the PCC backend but retain the data on the cache */
+       PCC_CLEANUP_FL_KEEP_DATA        = 0x1,
+};
+
 struct fid_array {
        __u32 fa_nr;
        /* make header's size equal lu_fid */
index dbacb2a..7d556dc 100644 (file)
@@ -2157,7 +2157,7 @@ migrate_free:
                if (!inode_owner_or_capable(inode2))
                        GOTO(out_iput, rc = -EPERM);
 
-               rc = pcc_ioctl_detach(inode2, detach->pccd_opt);
+               rc = pcc_ioctl_detach(inode2, detach->pccd_flags);
 out_iput:
                iput(inode2);
 out_detach:
index c43e42d..ea18207 100644 (file)
@@ -4044,7 +4044,7 @@ out_pcc:
                if (!inode_owner_or_capable(inode))
                        GOTO(out_detach_free, rc = -EPERM);
 
-               rc = pcc_ioctl_detach(inode, detach->pccd_opt);
+               rc = pcc_ioctl_detach(inode, detach->pccd_flags);
 out_detach_free:
                OBD_FREE_PTR(detach);
                RETURN(rc);
index efdd307..7f2c03f 100644 (file)
@@ -769,6 +769,7 @@ pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
        dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
        dataset->pccd_flags = cmd->u.pccc_add.pccc_flags;
        atomic_set(&dataset->pccd_refcount, 1);
+       dataset->pccd_hsmtool_type = HSMTOOL_POSIX;
 
        rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
        if (rc) {
@@ -866,10 +867,16 @@ pcc_dataset_del(struct pcc_super *super, char *pathname)
 static void
 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
 {
-       seq_printf(m, "%s:\n", dataset->pccd_pathname);
-       seq_printf(m, "  rwid: %u\n", dataset->pccd_rwid);
-       seq_printf(m, "  flags: %x\n", dataset->pccd_flags);
-       seq_printf(m, "  autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
+       seq_puts(m, "  -\n");
+       seq_printf(m, "    " PCC_YAML_PCCPATH ": %s\n",
+                  dataset->pccd_pathname);
+       seq_printf(m, "    " PCC_YAML_HSMTOOL ": %s\n",
+                  hsmtool_type2string(dataset->pccd_hsmtool_type));
+       seq_printf(m, "    " PCC_YAML_RWID ": %u\n", dataset->pccd_rwid);
+       seq_printf(m, "    " PCC_YAML_ROID ": %u\n", dataset->pccd_roid);
+       seq_printf(m, "    " PCC_YAML_FLAGS ": %x\n", dataset->pccd_flags);
+       seq_printf(m, "    " PCC_YAML_AUTOCACHE ": %s\n",
+                  dataset->pccd_rule.pmr_conds_str);
 }
 
 int
@@ -878,6 +885,8 @@ pcc_super_dump(struct pcc_super *super, struct seq_file *m)
        struct pcc_dataset *dataset;
 
        down_read(&super->pccs_rw_sem);
+       if (!list_empty(&super->pccs_datasets))
+               seq_puts(m, "pcc:\n");
        list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
                pcc_dataset_dump(dataset, m);
        }
@@ -1680,7 +1689,6 @@ static void pcc_io_fini(struct inode *inode, enum pcc_io_type iot,
                wake_up_all(&pcci->pcci_waitq);
 }
 
-
 static ssize_t
 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
 {
@@ -2103,7 +2111,7 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
                CDEBUG(D_MMAP,
                       "%s: PCC backend fs not support ->page_mkwrite()\n",
                       ll_i2sbi(inode)->ll_fsname);
-               pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
+               (void) pcc_ioctl_detach(inode, PCC_DETACH_FL_UNCACHE);
                up_read(&mm->mmap_sem);
                *cached = true;
                RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
@@ -2130,7 +2138,7 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
                 * __do_page_fault and retry the memory fault handling.
                 */
                if (page->mapping == pcc_file->f_mapping) {
-                       pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
+                       pcc_ioctl_detach(inode, PCC_DETACH_FL_UNCACHE);
                        *cached = true;
                        up_read(&mm->mmap_sem);
                        RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
@@ -2162,7 +2170,7 @@ out:
         * Lustre I/O path.
         */
        if (rc & VM_FAULT_SIGBUS) {
-               pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
+               (void) pcc_ioctl_detach(inode, PCC_DETACH_FL_UNCACHE);
                up_read(&mm->mmap_sem);
                RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
        }
@@ -2955,11 +2963,13 @@ static int pcc_hsm_remove(struct inode *inode)
        if (rc) {
                CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
                       PFID(&ll_i2info(inode)->lli_fid), rc);
-               RETURN(rc);
+               /* ignore the RESTORE failure.
+                * i.e. the file is in exists dirty archived state.
+                */
+       } else {
+               ll_layout_refresh(inode, &gen);
        }
 
-       ll_layout_refresh(inode, &gen);
-
        len = sizeof(struct hsm_user_request) +
              sizeof(struct hsm_user_item);
        OBD_ALLOC(hur, len);
@@ -2984,7 +2994,7 @@ static int pcc_hsm_remove(struct inode *inode)
        RETURN(rc);
 }
 
-int pcc_ioctl_detach(struct inode *inode, __u32 opt)
+int pcc_ioctl_detach(struct inode *inode, __u32 flags)
 {
        struct ll_inode_info *lli = ll_i2info(inode);
        struct pcc_inode *pcci;
@@ -3003,7 +3013,7 @@ int pcc_ioctl_detach(struct inode *inode, __u32 opt)
        LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
 
        if (pcci->pcci_type == LU_PCC_READWRITE) {
-               if (opt == PCC_DETACH_OPT_UNCACHE) {
+               if (flags & PCC_DETACH_FL_UNCACHE) {
                        hsm_remove = true;
                        /*
                         * The file will be removed from PCC, set the flags
@@ -3018,7 +3028,7 @@ int pcc_ioctl_detach(struct inode *inode, __u32 opt)
        } else if (pcci->pcci_type == LU_PCC_READONLY) {
                __pcc_layout_invalidate(pcci);
 
-               if (opt == PCC_DETACH_OPT_UNCACHE && !pcci->pcci_unlinked) {
+               if (flags & PCC_DETACH_FL_UNCACHE && !pcci->pcci_unlinked) {
                        old_cred =  override_creds(pcc_super_cred(inode->i_sb));
                        rc = pcc_inode_remove(inode, pcci->pcci_path.dentry);
                        revert_creds(old_cred);
@@ -3033,7 +3043,9 @@ int pcc_ioctl_detach(struct inode *inode, __u32 opt)
 
 out_unlock:
        pcc_inode_unlock(inode);
-       if (hsm_remove) {
+
+       if (hsm_remove || (flags & PCC_DETACH_FL_UNCACHE &&
+                          flags & PCC_DETACH_FL_KNOWN_READWRITE)) {
                old_cred = override_creds(pcc_super_cred(inode->i_sb));
                rc = pcc_hsm_remove(inode);
                revert_creds(old_cred);
index 5766940..842a27f 100644 (file)
@@ -120,6 +120,7 @@ struct pcc_dataset {
        struct path             pccd_path;       /* Root path */
        struct list_head        pccd_linkage;  /* Linked to pccs_datasets */
        atomic_t                pccd_refcount; /* Reference count */
+       enum hsmtool_type       pccd_hsmtool_type; /*HSM copytool type */
 };
 
 struct pcc_super {
@@ -231,7 +232,7 @@ int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
                              bool attached);
 int pcc_ioctl_attach(struct file *file, struct inode *inode,
                     struct lu_pcc_attach *attach);
-int pcc_ioctl_detach(struct inode *inode, __u32 opt);
+int pcc_ioctl_detach(struct inode *inode, __u32 flags);
 int pcc_ioctl_state(struct file *file, struct inode *inode,
                    struct lu_pcc_state *state);
 void pcc_file_init(struct pcc_file *pccf);
index 008256a..6ca3470 100644 (file)
@@ -2545,6 +2545,143 @@ test_29b() {
 }
 run_test 29b "Auto PCC-RO attach in atomic_open"
 
+test_30() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping
+
+       mkdir $DIR/$tdir || error "mkdir $DIR/$tdir failed"
+
+       file=$DIR/$tdir/rwattach
+       echo -n backend_del_attach > $file
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "RW-PCC attach $file failed"
+
+       file=$DIR/$tdir/rwattachrm
+       echo -n backend_del_attach_rm > $file
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "RW-PCC attach $file failed"
+       rm $file || error "rm $file failed"
+
+       file=$DIR/$tdir/roattach
+       echo -n backend_del_roattach_rm > $file
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "RO-PCC attach $file failed"
+
+       do_facet $SINGLEAGT $LCTL pcc list $MOUNT
+       do_facet $SINGLEAGT $LCTL pcc del -v -v -v -v $MOUNT $hsm_root ||
+               error "lctl pcc del $MOUNT $hsm_root failed"
+}
+run_test 30 "Test lctl pcc del command"
+
+test_31() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local -a lpcc_path1
+       local -a lpcc_path2
+       local -a lpcc_path3
+       local file
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ auto_attach=0"
+
+       mkdir $DIR/$tdir || error "mkdir $DIR/$tdir failed"
+
+       file=$DIR/$tdir/rwattach
+       echo -n backend_del_attach > $file
+       lpcc_path1=$(lpcc_fid2path $hsm_root $file)
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "RW-PCC attach $file failed"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "RW-PCC detach $file failed"
+       check_lpcc_state $file "none"
+
+       file=$DIR/$tdir/rwattachrm
+       echo -n backend_del_attach_rm > $file
+       lpcc_path2=$(lpcc_fid2path $hsm_root $file)
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "RW-PCC attach $file failed"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "RW-PCC detach $file failed"
+       check_lpcc_state $file "none"
+       rm $file || error "rm $file failed"
+
+       file=$DIR/$tdir/roattach
+       echo -n backend_del_roattach_rm > $file
+       lpcc_path3=$(lpcc_fid2path $hsm_root $file "readonly")
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "RO-PCC attach $file failed"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "RO-PCC detach $file failed"
+       check_lpcc_state $file "none"
+
+       do_facet $SINGLEAGT $LCTL pcc list $MOUNT
+       do_facet $SINGLEAGT $LCTL pcc del -v -v -v -v -k $MOUNT $hsm_root ||
+               error "lctl pcc del -k $MOUNT $hsm_root failed"
+
+       do_facet $SINGLEAGT "[ -f $lpcc_path1 ]" ||
+               error "PCC copy $lpcc_path1 should retain"
+       do_facet $SINGLEAGT "[ -f $lpcc_path2 ]" ||
+               error "PCC copy $lpcc_path1 should retain"
+       do_facet $SINGLEAGT "[ -f $lpcc_path3 ]" ||
+               error "PCC copy $lpcc_path1 should retain"
+}
+run_test 31 "Test lctl pcc del command with --keep option"
+
+test_32() {
+       local agt_host=$(facet_active_host $SINGLEAGT)
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+       local -a lpcc_path
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ auto_attach=0"
+
+       do_facet $SINGLEAGT echo -n roattach_removed > $file
+       lpcc_path=$(lpcc_fid2path $hsm_root $file "readonly")
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "RO-PCC attach $file failed"
+       rmultiop_start $agt_host $file o_rc || error "multiop $file failed"
+       sleep 3
+       do_facet $SINGLEAGT rm $lpcc_path || error "rm $lpcc_path failed"
+       rmultiop_stop $agt_host || error "multiop $file read failed"
+       check_lpcc_state $file "readonly"
+
+       local content=$(do_facet $SINGLEAGT cat $file)
+       [[ $content == "roattach_removed" ]] || error "data mismatch: $content"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "RO-PCC detach $file failed"
+       check_lpcc_state $file "none"
+
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "RO-PCC attach $file failed"
+       do_facet $SINGLEAGT rm $lpcc_path || error "rm $lpcc_path failed"
+       check_lpcc_state $file "readonly"
+       content=$(do_facet $SINGLEAGT cat $file)
+       [[ $content == "roattach_removed" ]] || error "data mismatch: $content"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "RO-PCC detach $file failed"
+       check_lpcc_state $file "none"
+}
+run_test 32 "Test for RO-PCC when PCC copy is deleted"
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status
index edae77e..7475c63 100644 (file)
@@ -93,6 +93,7 @@ endif
 
 llverfs_LDADD := $(EXT2FSLIB) $(E2PLIB)
 
+liblustreapi_la_CFLAGS := $(AM_CFLAGS) -I $(top_builddir)/lnet/utils
 liblustreapi_la_SOURCES = liblustreapi.c liblustreapi_hsm.c \
                          liblustreapi_nodemap.c lustreapi_internal.h \
                          liblustreapi_json.c liblustreapi_layout.c \
@@ -101,10 +102,12 @@ liblustreapi_la_SOURCES = liblustreapi.c liblustreapi_hsm.c \
                          liblustreapi_mirror.c liblustreapi_fid.c \
                          liblustreapi_ladvise.c liblustreapi_chlg.c \
                          liblustreapi_heat.c liblustreapi_pcc.c \
-                         liblustreapi_lseek.c
+                         liblustreapi_lseek.c libhsm_scanner.h \
+                         libhsm_scanner.c
 liblustreapi_la_LDFLAGS = $(LIBREADLINE) -version-info 1:0:0 \
                          -Wl,--version-script=liblustreapi.map
 liblustreapi_la_LIBADD = $(top_builddir)/libcfs/libcfs/libcfs.la
+liblustreapi_la_LIBADD += $(top_builddir)/lnet/utils/lnetconfig/liblnetconfig.la
 
 pkgconfigdir = $(libdir)/pkgconfig
 pkgconfig_DATA = lustre.pc
index ccf0d77..eff36a7 100644 (file)
@@ -11539,14 +11539,14 @@ static int lfs_pcc_detach(int argc, char **argv)
        int                      rc = 0;
        const char              *path;
        char                     fullpath[PATH_MAX];
-       __u32                    detach_opt = PCC_DETACH_OPT_UNCACHE;
+       __u32                    detach_flags = PCC_DETACH_FL_UNCACHE;
 
        optind = 0;
        while ((c = getopt_long(argc, argv, short_opts,
                                long_opts, NULL)) != -1) {
                switch (c) {
                case 'k':
-                       detach_opt = PCC_DETACH_OPT_NONE;
+                       detach_flags = PCC_DETACH_FL_NONE;
                        break;
                case '?':
                        return CMD_HELP;
@@ -11569,7 +11569,7 @@ static int lfs_pcc_detach(int argc, char **argv)
                        continue;
                }
 
-               rc2 = llapi_pcc_detach_file(fullpath, detach_opt);
+               rc2 = llapi_pcc_detach_file(fullpath, detach_flags);
                if (rc2 < 0) {
                        rc2 = -errno;
                        fprintf(stderr,
@@ -11592,14 +11592,14 @@ static int lfs_pcc_detach_fid(int argc, char **argv)
        int              rc = 0;
        const char      *fid;
        const char      *mntpath;
-       __u32            detach_opt = PCC_DETACH_OPT_UNCACHE;
+       __u32            detach_flags = PCC_DETACH_FL_UNCACHE;
 
        optind = 0;
        while ((c = getopt_long(argc, argv, short_opts,
                                long_opts, NULL)) != -1) {
                switch (c) {
                case 'k':
-                       detach_opt = PCC_DETACH_OPT_NONE;
+                       detach_flags = PCC_DETACH_FL_NONE;
                        break;
                case '?':
                        return CMD_HELP;
@@ -11617,7 +11617,7 @@ static int lfs_pcc_detach_fid(int argc, char **argv)
 
                fid = argv[optind++];
 
-               rc2 = llapi_pcc_detach_fid_str(mntpath, fid, detach_opt);
+               rc2 = llapi_pcc_detach_fid_str(mntpath, fid, detach_flags);
                if (rc2 < 0) {
                        fprintf(stderr,
                                "%s: cannot detach '%s' on '%s' from PCC: %s\n",
diff --git a/lustre/utils/libhsm_scanner.c b/lustre/utils/libhsm_scanner.c
new file mode 100644 (file)
index 0000000..62b1b40
--- /dev/null
@@ -0,0 +1,202 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2019, DDN Storage Corporation.
+ */
+/*
+ * lustre/utils/libhsm_scanner.c
+ *
+ * Library for scanning HSM backend fs.
+ *
+ * Author: Qian Yingjin <qian@ddn.com>
+ */
+
+#include <stdlib.h>
+#include <errno.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <dirent.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <libcfs/util/list.h>
+#include "libhsm_scanner.h"
+
+struct hsm_scan_item {
+       struct list_head        hsi_item;
+       int                     hsi_depth;
+       char                    hsi_pathname[PATH_MAX];
+};
+
+static int hsm_scan_item_alloc(struct list_head *head,
+                              const char *pathname, int depth)
+{
+       struct hsm_scan_item *item;
+       int rc;
+
+       item = malloc(sizeof(struct hsm_scan_item));
+       if (item == NULL) {
+               rc = -ENOMEM;
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "cannot allocate hsm item for '%s'", pathname);
+               return rc;
+       }
+
+       item->hsi_depth = depth;
+       strncpy(item->hsi_pathname, pathname, sizeof(item->hsi_pathname));
+       list_add_tail(&item->hsi_item, head);
+
+       return 0;
+}
+
+int hsm_scan_handle_dir(struct hsm_scan_control *hsc, struct list_head *head,
+                       struct hsm_scan_item *item)
+{
+       char fullname[PATH_MAX + NAME_MAX + 1];
+       const char *pathname = item->hsi_pathname;
+       int depth = item->hsi_depth;
+       struct dirent *ent;
+       DIR *dir;
+       int ret;
+       int rc = 0;
+
+       dir = opendir(pathname);
+       if (dir == NULL) {
+               rc = -errno;
+               llapi_error(LLAPI_MSG_ERROR, rc, "failed to opendir '%s'",
+                           pathname);
+               return rc;
+       }
+
+       while ((ent = readdir(dir)) != NULL) {
+               /* skip "." and ".." */
+               if (strcmp(ent->d_name, ".") == 0 ||
+                   strcmp(ent->d_name, "..") == 0)
+                       continue;
+
+               llapi_printf(LLAPI_MSG_DEBUG,
+                            "check file %d:'%s' under directory '%s'\n",
+                            depth, ent->d_name, pathname);
+               if (depth == 0 && ent->d_type == DT_DIR &&
+                   strcmp(ent->d_name, "shadow") == 0) {
+                       llapi_printf(LLAPI_MSG_DEBUG,
+                                    "skipping check of 'shadow' directory.\n");
+               } else if (depth < 6) {
+                       /* No regular file under this directory level */
+                       if (ent->d_type == DT_REG) {
+                               llapi_printf(LLAPI_MSG_DEBUG,
+                                            "improved HSM layout for '%s' under directory '%s'?\n",
+                                            ent->d_name, pathname);
+                               ret = hsc->hsc_func(pathname, ent->d_name, hsc);
+                               if (ret && !rc) {
+                                       hsc->hsc_errnum++;
+                                       rc = ret;
+                                       /* ignore error, continue to check */
+                               }
+                       } else if (ent->d_type == DT_DIR) {
+                               if (strlen(ent->d_name) + strlen(pathname) + 1
+                                   >= sizeof(fullname)) {
+                                       rc = -ENAMETOOLONG;
+                                       errno = ENAMETOOLONG;
+                                       llapi_err_noerrno(LLAPI_MSG_ERROR,
+                                                         "ignore too long path: %s/%s\n",
+                                                         pathname,
+                                                         ent->d_name);
+                                       hsc->hsc_errnum++;
+                                       continue;
+                               }
+                               snprintf(fullname, sizeof(fullname), "%s/%s",
+                                        pathname, ent->d_name);
+                               rc = hsm_scan_item_alloc(head, fullname,
+                                                        depth + 1);
+                       }
+               } else if (depth == 6) {
+                       /* This is the directory level that should has files */
+                       if (ent->d_type == DT_DIR) {
+                               llapi_err_noerrno(LLAPI_MSG_DEBUG,
+                                                 "ignore too deep subdir '%s' under directory '%s'\n",
+                                                 ent->d_name, pathname);
+                               hsc->hsc_errnum++;
+                       } else if (ent->d_type == DT_REG) {
+                               ret = hsc->hsc_func(pathname, ent->d_name, hsc);
+                               if (ret && !rc) {
+                                       hsc->hsc_errnum++;
+                                       rc = ret;
+                                       /* ignore error, continue to check */
+                               }
+                       }
+               } else {
+                       hsc->hsc_errnum++;
+                       ret = -EINVAL;
+                       llapi_error(LLAPI_MSG_ERROR, ret,
+                                   "ignore too deep directory '%s'", pathname);
+               }
+       }
+
+       if (rc)
+               llapi_error(LLAPI_MSG_ERROR, rc, "failed to handle dir '%s'",
+                           pathname);
+
+       closedir(dir);
+       return rc;
+}
+
+int hsm_scan_process(struct hsm_scan_control *hsc)
+{
+       struct hsm_scan_item *item;
+       struct list_head head;
+       struct stat st;
+       int ret = 0;
+       int rc;
+
+       if (hsc->hsc_type != HSMTOOL_POSIX)
+               return -EOPNOTSUPP;
+
+       rc = stat(hsc->hsc_hsmpath, &st);
+       if (rc) {
+               llapi_error(LLAPI_MSG_ERROR, rc, "failed to stat '%s'",
+                           hsc->hsc_hsmpath);
+               return rc;
+       }
+
+       if (!S_ISDIR(st.st_mode)) {
+               llapi_err_noerrno(LLAPI_MSG_ERROR,
+                                 "HSM root path '%s' must be a directory.",
+                                 hsc->hsc_hsmpath);
+               return -EINVAL;
+       }
+
+       INIT_LIST_HEAD(&head);
+       rc = hsm_scan_item_alloc(&head, hsc->hsc_hsmpath, 0);
+       if (rc)
+               return rc;
+
+       while (!list_empty(&head)) {
+               item = list_entry(head.next, struct hsm_scan_item, hsi_item);
+               list_del(&item->hsi_item);
+               ret = hsm_scan_handle_dir(hsc, &head, item);
+               if (!rc && ret)
+                       rc = ret;
+               free(item);
+       }
+
+       return rc;
+}
diff --git a/lustre/utils/libhsm_scanner.h b/lustre/utils/libhsm_scanner.h
new file mode 100644 (file)
index 0000000..6bc2dfe
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2019, DDN Storage Corporation.
+ */
+/*
+ * lustre/utils/libhsm_scanner.h
+ *
+ * Author: Qian Yingjin <qian@ddn.com>
+ */
+#ifndef _LIBHSM_SCANNER_H
+#define _LIBHSM_SCANNER_H
+
+#include <lustre/lustreapi.h>
+
+struct hsm_scan_control;
+
+typedef int (*hsm_scan_func_t)(const char *pname, const char *fname,
+                              struct hsm_scan_control *hsc);
+
+struct hsm_scan_control {
+       enum hsmtool_type        hsc_type;
+       const char              *hsc_mntpath;
+       const char              *hsc_hsmpath;
+       hsm_scan_func_t          hsc_func;
+       int                      hsc_errnum;
+};
+
+int hsm_scan_process(struct hsm_scan_control *hsc);
+
+static inline bool endswith(const char *str, const char *s)
+{
+       size_t len1 = strlen(str);
+       size_t len2 = strlen(s);
+
+       if (len1 < len2)
+               return false;
+
+       return !strcmp(str + len1 - len2, s);
+}
+
+#endif /* LIBHSM_SCANNER_H */
index 2d6a9bd..24468de 100644 (file)
@@ -39,7 +39,9 @@
 #include <unistd.h>
 #include <stdlib.h>
 #include <sys/ioctl.h>
+#include <lnetconfig/cyaml.h>
 #include "lustreapi_internal.h"
+#include "libhsm_scanner.h"
 
 /**
  * Fetch and attach a file to readwrite PCC.
@@ -252,16 +254,16 @@ int llapi_pcc_attach_fid_str(const char *mntpath, const char *fidstr,
  * detach PCC cache of a file by using fd.
  *
  * \param fd           File handle.
- * \param option       Detach option
+ * \param flags                Detach flags.
  *
  * \return 0 on success, an error code otherwise.
  */
-int llapi_pcc_detach_fd(int fd, __u32 option)
+int llapi_pcc_detach_fd(int fd, __u32 flags)
 {
        struct lu_pcc_detach detach;
        int rc;
 
-       detach.pccd_opt = option;
+       detach.pccd_flags = flags;
        rc = ioctl(fd, LL_IOC_PCC_DETACH, &detach);
        return rc;
 }
@@ -271,12 +273,12 @@ int llapi_pcc_detach_fd(int fd, __u32 option)
  *
  * \param mntpath      Fullpath to the client mount point.
  * \param fid          FID of the file.
- * \param option       Detach option.
+ * \param flags                Detach flags.
  *
  * \return 0 on success, an error code otherwise.
  */
 int llapi_pcc_detach_fid(const char *mntpath, const struct lu_fid *fid,
-                        __u32 option)
+                        __u32 flags)
 {
        int rc;
        int fd;
@@ -297,7 +299,7 @@ int llapi_pcc_detach_fid(const char *mntpath, const struct lu_fid *fid,
         * files.
         */
        detach.pccd_fid = *fid;
-       detach.pccd_opt = option;
+       detach.pccd_flags = flags;
        rc = ioctl(fd, LL_IOC_PCC_DETACH_BY_FID, &detach);
        close(fd);
        return rc;
@@ -308,12 +310,12 @@ int llapi_pcc_detach_fid(const char *mntpath, const struct lu_fid *fid,
  *
  * \param mntpath      Fullpath to the client mount point.
  * \param fidstr       FID string of the file.
- * \param option       Detach option.
+ * \param flags                Detach flags.
  *
  * \return 0 on success, an error code otherwise.
  */
 int llapi_pcc_detach_fid_str(const char *mntpath, const char *fidstr,
-                            __u32 option)
+                            __u32 flags)
 {
        int rc;
        struct lu_fid fid;
@@ -330,7 +332,7 @@ int llapi_pcc_detach_fid_str(const char *mntpath, const char *fidstr,
                return -EINVAL;
        }
 
-       rc = llapi_pcc_detach_fid(mntpath, &fid, option);
+       rc = llapi_pcc_detach_fid(mntpath, &fid, flags);
 
        return rc;
 }
@@ -339,11 +341,11 @@ int llapi_pcc_detach_fid_str(const char *mntpath, const char *fidstr,
  * detach PCC cache of a file.
  *
  * \param path         Fullpath to the file to operate on.
- * \param option       Detach option.
+ * \param flags                Detach flags.
  *
  * \return 0 on success, an error code otherwise.
  */
-int llapi_pcc_detach_file(const char *path, __u32 option)
+int llapi_pcc_detach_file(const char *path, __u32 flags)
 {
        int rc;
        int fd;
@@ -356,7 +358,7 @@ int llapi_pcc_detach_file(const char *path, __u32 option)
                return rc;
        }
 
-       rc = llapi_pcc_detach_fd(fd, option);
+       rc = llapi_pcc_detach_fd(fd, flags);
        close(fd);
        return rc;
 }
@@ -519,3 +521,220 @@ out_free_param:
        cfs_free_param_data(&path);
        return rc;
 }
+
+static int llapi_pcc_scan_detach(const char *pname, const char *fname,
+                                struct hsm_scan_control *hsc)
+{
+       char fidstr[FID_LEN];
+       const char *fidname;
+       __u32 detach_flags;
+       bool lov_file;
+       int rc;
+
+       detach_flags = PCC_DETACH_FL_UNCACHE;
+       /* It is the saved lov file when archive on HSM backend. */
+       lov_file = endswith(fname, ".lov");
+       if (lov_file) {
+               size_t len;
+
+               len = strlen(fname) - strlen(".lov");
+               if (len > sizeof(fidstr)) {
+                       rc = -ENAMETOOLONG;
+                       errno = ENAMETOOLONG;
+                       llapi_error(LLAPI_MSG_ERROR, rc,
+                                   "Too long PCC-RO fname %s/%s",
+                                   pname, fname);
+                       return rc;
+               }
+               strncpy(fidstr, fname, FID_LEN);
+               fidstr[len] = '\0';
+               detach_flags |= PCC_DETACH_FL_KNOWN_READWRITE;
+               fidname = fidstr;
+       } else {
+               fidname = fname;
+       }
+
+       llapi_printf(LLAPI_MSG_DEBUG, "Handle the file: %s\n", fidname);
+
+       return llapi_pcc_detach_fid_str(hsc->hsc_mntpath, fidname,
+                                       detach_flags);
+}
+
+static int llapi_pcc_del_internal(const char *mntpath, const char *pccpath,
+                                 enum hsmtool_type type, __u32 flags)
+{
+       struct hsm_scan_control hsc;
+       char cmd[PATH_MAX];
+       int rc;
+
+       snprintf(cmd, sizeof(cmd), "del %s", pccpath);
+       rc = llapi_pccdev_set(mntpath, cmd);
+       if (rc < 0) {
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "failed to run '%s' on %s", cmd, mntpath);
+               return rc;
+       }
+
+       if (flags & PCC_CLEANUP_FL_KEEP_DATA)
+               return 0;
+
+       hsc.hsc_type = type;
+       hsc.hsc_mntpath = mntpath;
+       hsc.hsc_hsmpath = pccpath;
+       hsc.hsc_func = llapi_pcc_scan_detach;
+       hsc.hsc_errnum = 0;
+       rc = hsm_scan_process(&hsc);
+
+       return rc;
+}
+
+struct pcc_cmd_handler;
+
+typedef int (*pcc_handler_t)(struct cYAML *node, struct pcc_cmd_handler *pch);
+
+enum pcc_cmd_t {
+       PCC_CMD_DEL,
+       PCC_CMD_CLEAR,
+};
+
+struct pcc_cmd_handler {
+       enum pcc_cmd_t   pch_cmd;
+       bool             pch_iter_cont;
+       bool             pch_flags;
+       const char      *pch_mntpath;
+       const char      *pch_pccpath;
+       pcc_handler_t    pch_cb;
+};
+
+static int llapi_pcc_yaml_cb_helper(struct pcc_cmd_handler *pch)
+{
+       struct cYAML *tree = NULL, *err_rc = NULL, *pcc_node = NULL,
+                    *node = NULL;
+       char pathbuf[sizeof(struct obd_uuid)];
+       glob_t path;
+       int rc;
+
+       rc = llapi_getname(pch->pch_mntpath, pathbuf, sizeof(pathbuf));
+       if (rc < 0) {
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "cannot get name for '%s'\n", pch->pch_mntpath);
+               return rc;
+       }
+
+       rc = cfs_get_param_paths(&path, "llite/%s/pcc", pathbuf);
+       if (rc != 0)
+               return -errno;
+
+       tree = cYAML_build_tree(path.gl_pathv[0], NULL, 0, &err_rc, false);
+       if (!tree) {
+               rc = -EINVAL;
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "cannot parse YAML file %s\n", path.gl_pathv[0]);
+               cYAML_build_error(rc, -1, "yaml", "from PCC yaml",
+                                 "can't parse", &err_rc);
+               cYAML_print_tree2file(stderr, err_rc);
+               cYAML_free_tree(err_rc);
+               goto out_free;
+       }
+
+       pcc_node = cYAML_get_object_item(tree, "pcc");
+       if (!pcc_node)
+               goto out_free;
+
+       if (!cYAML_is_sequence(pcc_node)) {
+               rc = -EINVAL;
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "bad PCC backend Array!");
+               goto out_free;
+       }
+
+       while (cYAML_get_next_seq_item(pcc_node, &node) != NULL &&
+              pch->pch_iter_cont) {
+               int ret;
+
+               ret = pch->pch_cb(node, pch);
+               if (ret && !rc)
+                       rc = ret;
+       }
+
+       /* Not found the given PCC backend on the client. */
+       if (pch->pch_iter_cont && pch->pch_cmd == PCC_CMD_DEL)
+               rc = -ENOENT;
+
+out_free:
+       if (tree)
+               cYAML_free_tree(tree);
+       cfs_free_param_data(&path);
+       return rc;
+
+}
+
+static int llapi_handle_yaml_pcc_del(struct cYAML *node,
+                                    struct pcc_cmd_handler *pch)
+{
+       struct cYAML *pccpath, *hsmtool;
+       enum hsmtool_type type;
+
+       pccpath = cYAML_get_object_item(node, PCC_YAML_PCCPATH);
+       hsmtool = cYAML_get_object_item(node, PCC_YAML_HSMTOOL);
+
+       if (!pccpath || !pccpath->cy_valuestring ||
+           !hsmtool || !hsmtool->cy_valuestring)
+               return 0;
+
+       if (strcmp(pccpath->cy_valuestring, pch->pch_pccpath))
+               return 0;
+
+       pch->pch_iter_cont = false;
+       type = hsmtool_string2type(hsmtool->cy_valuestring);
+       return llapi_pcc_del_internal(pch->pch_mntpath, pch->pch_pccpath,
+                                     type, pch->pch_flags);
+}
+
+int llapi_pcc_del(const char *mntpath, const char *pccpath, __u32 flags)
+{
+       struct pcc_cmd_handler pch;
+
+       pch.pch_cmd = PCC_CMD_DEL;
+       pch.pch_iter_cont = true;
+       pch.pch_mntpath = mntpath;
+       pch.pch_pccpath = pccpath;
+       pch.pch_flags = flags;
+       pch.pch_cb = llapi_handle_yaml_pcc_del;
+
+       return llapi_pcc_yaml_cb_helper(&pch);
+}
+
+
+static int llapi_handle_yaml_pcc_clear(struct cYAML *node,
+                                      struct pcc_cmd_handler *pch)
+{
+       struct cYAML *pccpath, *hsmtool;
+       enum hsmtool_type type;
+
+       pccpath = cYAML_get_object_item(node, PCC_YAML_PCCPATH);
+       hsmtool = cYAML_get_object_item(node, PCC_YAML_HSMTOOL);
+
+       if (!pccpath || !pccpath->cy_valuestring ||
+           !hsmtool || !hsmtool->cy_valuestring)
+               return 0;
+
+       type = hsmtool_string2type(hsmtool->cy_valuestring);
+       return llapi_pcc_del_internal(pch->pch_mntpath,
+                                     pccpath->cy_valuestring,
+                                     type, pch->pch_flags);
+}
+
+int llapi_pcc_clear(const char *mntpath, __u32 flags)
+{
+       struct pcc_cmd_handler pch;
+
+       pch.pch_cmd = PCC_CMD_CLEAR;
+       pch.pch_iter_cont = true;
+       pch.pch_mntpath = mntpath;
+       pch.pch_pccpath = NULL;
+       pch.pch_flags = flags;
+       pch.pch_cb = llapi_handle_yaml_pcc_clear;
+
+       return llapi_pcc_yaml_cb_helper(&pch);
+}
index fa196a0..f834dab 100644 (file)
@@ -5618,55 +5618,121 @@ int jt_pcc_add(int argc, char **argv)
        snprintf(cmd, PATH_MAX, "add %s %s", pccpath, param);
        rc = llapi_pccdev_set(mntpath, cmd);
        if (rc < 0)
-               fprintf(stderr, "%s: failed to run '%s' on %s\n",
-                       jt_cmdname(argv[0]), cmd, mntpath);
+               fprintf(stderr, "%s: failed to run '%s' on '%s': %s\n",
+                       jt_cmdname(argv[0]), cmd, mntpath, strerror(errno));
 
        return rc;
 }
 
 int jt_pcc_del(int argc, char **argv)
 {
+       static struct option long_opts[] = {
+       { .val = 'k',   .name = "keep-data",    .has_arg = no_argument },
+       { .val = 'v',   .name = "verbose",      .has_arg = no_argument },
+       { .name = NULL } };
+       char fsname[MAX_OBD_NAME + 1];
        const char *mntpath;
        const char *pccpath;
-       char cmd[PATH_MAX];
+       __u32 flags = PCC_CLEANUP_FL_NONE;
+       int verbose = LLAPI_MSG_INFO;
        int rc;
+       int c;
 
-       optind = 1;
-       if (argc != 3) {
-               fprintf(stderr, "%s: require 2 arguments\n",
+       while ((c = getopt_long(argc, argv, "kv", long_opts, NULL)) != -1) {
+               switch (c) {
+               case 'k':
+                       flags = PCC_CLEANUP_FL_KEEP_DATA;
+                       break;
+               case 'v':
+                       verbose++;
+                       break;
+               case '?':
+                       return CMD_HELP;
+               default:
+                       fprintf(stderr, "%s: option '%s' unrecognized\n",
+                               argv[0], argv[optind - 1]);
+                       return CMD_HELP;
+               }
+       }
+       if (optind + 2 != argc) {
+               fprintf(stderr, "%s: must specify mount path and PCC path\n",
                        jt_cmdname(argv[0]));
                return CMD_HELP;
        }
 
        mntpath = argv[optind++];
-       pccpath = argv[optind++];
+       pccpath = argv[optind];
 
-       snprintf(cmd, PATH_MAX, "del %s", pccpath);
-       rc = llapi_pccdev_set(mntpath, cmd);
+       rc = llapi_search_fsname(mntpath, fsname);
+       if (rc < 0) {
+               fprintf(stderr,
+                       "%s: cannot find a Lustre filesystem mounted at '%s'\n",
+                       jt_cmdname(argv[0]), mntpath);
+               return rc;
+       }
+
+       /* Set llapi message level */
+       llapi_msg_set_level(verbose);
+       rc = llapi_pcc_del(mntpath, pccpath, flags);
        if (rc < 0)
-               fprintf(stderr, "%s: failed to run '%s' on %s\n",
-                       jt_cmdname(argv[0]), cmd, mntpath);
+               fprintf(stderr, "%s: failed to delete '%s' on '%s': %s\n",
+                       jt_cmdname(argv[0]), pccpath, mntpath, strerror(errno));
 
        return rc;
 }
 
 int jt_pcc_clear(int argc, char **argv)
 {
+       static struct option long_opts[] = {
+       { .val = 'k',   .name = "keep-data",    .has_arg = no_argument },
+       { .val = 'v',   .name = "verbose",      .has_arg = no_argument },
+       { .name = NULL } };
+       char fsname[MAX_OBD_NAME + 1];
        const char *mntpath;
+       __u32 flags = PCC_CLEANUP_FL_NONE;
+       int verbose = LLAPI_MSG_INFO;
        int rc;
+       int c;
 
-       optind = 1;
-       if (argc != 2) {
-               fprintf(stderr, "%s: require 1 arguments\n",
+       while ((c = getopt_long(argc, argv, "kv", long_opts, NULL)) != -1) {
+               switch (c) {
+               case 'k':
+                       flags = PCC_CLEANUP_FL_KEEP_DATA;
+                       break;
+               case 'v':
+                       verbose++;
+                       break;
+               case '?':
+                       return CMD_HELP;
+               default:
+                       fprintf(stderr, "%s: option '%s' unrecognized\n",
+                               argv[0], argv[optind - 1]);
+                       return CMD_HELP;
+               }
+       }
+       if (optind + 1 != argc) {
+               fprintf(stderr, "%s: must speficy mount path\n",
                        jt_cmdname(argv[0]));
                return CMD_HELP;
        }
 
        mntpath = argv[optind];
-       rc = llapi_pccdev_set(mntpath, "clear");
-       if (rc < 0)
-               fprintf(stderr, "%s: failed to run 'clear' on %s\n",
+
+       rc = llapi_search_fsname(mntpath, fsname);
+       if (rc < 0) {
+               fprintf(stderr,
+                       "%s: cannot find a Lustre filesystem mounted at '%s'\n",
                        jt_cmdname(argv[0]), mntpath);
+               return rc;
+       }
+
+       /* Set llapi message level */
+       llapi_msg_set_level(verbose);
+       rc = llapi_pcc_clear(mntpath, flags);
+       if (rc < 0)
+               fprintf(stderr,
+                       "%s: failed to remove all PCC backends on '%s': %s\n",
+                       jt_cmdname(argv[0]), mntpath, strerror(errno));
 
        return rc;
 }
@@ -5686,8 +5752,8 @@ int jt_pcc_list(int argc, char **argv)
        mntpath = argv[optind];
        rc = llapi_pccdev_get(mntpath);
        if (rc < 0)
-               fprintf(stderr, "%s: failed to run 'pcc list' on %s\n",
-                       jt_cmdname(argv[0]), mntpath);
+               fprintf(stderr, "%s: failed to run 'pcc list' on '%s': %s\n",
+                       jt_cmdname(argv[0]), mntpath, strerror(errno));
 
        return rc;
 }