Whamcloud - gitweb
LU-10092 llite: Add persistent cache on client 63/32963/38
authorLi Xi <lixi@ddn.com>
Tue, 27 Jun 2017 12:18:14 +0000 (20:18 +0800)
committerOleg Drokin <green@whamcloud.com>
Thu, 13 Jun 2019 04:29:12 +0000 (04:29 +0000)
PCC is a new framework which provides a group of local cache
on Lustre client side. No global namespace will be provided
by PCC. Each client uses its own local storage as a cache for
itself. Local file system is used to manage the data on local
caches. Cached I/O is directed to local filesystem while
normal I/O is directed to OSTs.

PCC uses HSM for data synchronization. It uses HSM copytool
to restore file from local caches to Lustre OSTs. Each PCC
has a copytool instance running with unique archive number.
Any remote access from another Lustre client would trigger
the data synchronization. If a client with PCC goes offline,
the cached data becomes inaccessible for other client
temporarilly. And after the PCC client reboots and the copytool
restarts, the data will be accessible again.

ToDo:
1) Make PCC exclusive with HSM.
2) Strong size consistence for PCC cached file among clients.
3) Support to cache partial content of a file.

Change-Id: I188ed36c48aae223380739f607cc6caf2f789298
Test-Parameters: clientcount=3 testlist=sanity-pcc,sanity-pcc,sanity-pcc
Signed-off-by: Li Xi <lixi@ddn.com>
Signed-off-by: Wang Shilong <wshilong@ddn.com>
Signed-off-by: Qian Yingjin <qian@ddn.com>
Reviewed-on: https://review.whamcloud.com/32963
Tested-by: Jenkins
Reviewed-by: Patrick Farrell <pfarrell@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
54 files changed:
lustre/autoconf/lustre-core.m4
lustre/doc/Makefile.am
lustre/doc/lctl-pcc.8 [new file with mode: 0644]
lustre/doc/lctl.8
lustre/doc/lfs-pcc.1 [new file with mode: 0644]
lustre/doc/lfs.1
lustre/doc/llapi_pcc_attach.3 [new file with mode: 0644]
lustre/doc/llapi_pcc_detach_fid.3 [new file with mode: 0644]
lustre/doc/llapi_pcc_detach_fid_fd.3 [new file with mode: 0644]
lustre/doc/llapi_pcc_detach_fid_str.3 [new file with mode: 0644]
lustre/doc/llapi_pcc_detach_file.3 [new file with mode: 0644]
lustre/doc/llapi_pcc_state_get.3 [new file with mode: 0644]
lustre/doc/llapi_pcc_state_get_fd.3 [new file with mode: 0644]
lustre/doc/llapi_pccdev_get.3 [new file with mode: 0644]
lustre/doc/llapi_pccdev_set.3 [new file with mode: 0644]
lustre/include/lustre/lustreapi.h
lustre/include/lustre_compat.h
lustre/include/md_object.h
lustre/include/obd.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/include/uapi/linux/lustre/lustre_user.h
lustre/llite/Makefile.in
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/llite_mmap.c
lustre/llite/lproc_llite.c
lustre/llite/namei.c
lustre/llite/pcc.c [new file with mode: 0644]
lustre/llite/pcc.h [new file with mode: 0644]
lustre/llite/super25.c
lustre/llite/xattr.c
lustre/llite/xattr26.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_lib.c
lustre/mdd/mdd_dir.c
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_open.c
lustre/tests/Makefile.am
lustre/tests/replay-vbr.sh
lustre/tests/sanity-hsm.sh
lustre/tests/sanity-pcc.sh [new file with mode: 0644]
lustre/tests/sanity-quota.sh
lustre/tests/test-framework.sh
lustre/tests/test-groups/regression
lustre/utils/Makefile.am
lustre/utils/lctl.c
lustre/utils/lfs.c
lustre/utils/lhsmtool_posix.c
lustre/utils/liblustreapi_pcc.c [new file with mode: 0644]
lustre/utils/obd.c
lustre/utils/obdctl.h

index 45271d6..6df57bc 100644 (file)
@@ -638,6 +638,23 @@ fs_struct_seqcount, [
 ])
 ]) # LC_FS_STRUCT_SEQCOUNT
 
+# LC_DENTRY_PATH_RAW
+#
+# Kernel version 2.6.37 commit ec2447c278ee973d35f38e53ca16ba7f965ae33d
+# dentry_path_raw is exported
+#
+AC_DEFUN([LC_DENTRY_PATH_RAW], [
+LB_CHECK_COMPILE([if 'dentry_path_raw' exist],
+dentry_path_raw, [
+       #include <linux/dcache.h>
+],[
+       dentry_path_raw(NULL, NULL, 0);
+],[
+       AC_DEFINE(HAVE_DENTRY_PATH_RAW, 1,
+               ['dentry_path_raw' is available])
+])
+]) # LC_DENTRY_PATH_RAW
+
 #
 # LC_D_COMPARE_7ARGS
 #
@@ -1001,6 +1018,23 @@ security_inode_init_security_callback, [
 ]) # LC_HAVE_SECURITY_IINITSEC
 
 #
+# 2.6.39 vfs_create takes a 'struct nameidata' parameter
+#
+AC_DEFUN([LC_VFS_CREATE_USE_NAMEIDATA], [
+LB_CHECK_COMPILE([if vfs_create takes a struct nameidata parameter],
+vfs_create, [
+       #include <linux/namei.h>
+       #include <linux/fs.h>
+],[
+       struct nameidata *nd;
+       vfs_create(NULL, NULL, 0, nd);
+],[
+       AC_DEFINE(HAVE_VFS_CREATE_USE_NAMEIDATA, 1,
+               [vfs_create use nameidata as parameter])
+])
+]) # LC_VFS_CREATE_USE_NAMEIDATA
+
+#
 # LC_HAVE_MIGRATE_HEADER
 #
 # 3.3 introduces migrate_mode.h and migratepage has 4 args
@@ -1430,6 +1464,23 @@ is_sxid, [
 ]) # LC_HAVE_IS_SXID
 
 #
+# LC_HAVE_VFS_GETATTR_2ARGS
+#
+AC_DEFUN([LC_HAVE_VFS_GETATTR_2ARGS], [
+LB_CHECK_COMPILE([if vfs_getattr takes 2 args],
+vfs_getattr, [
+       #include <linux/fs.h>
+],[
+       struct path path;
+
+       vfs_getattr(&path, NULL);
+],[
+       AC_DEFINE(HAVE_VFS_GETATTR_2ARGS, 1,
+               [vfs_getattr takes 2 args])
+])
+]) # LC_HAVE_VFS_GETATTR_2ARGS
+
+#
 # LC_HAVE_REMOVE_PROC_SUBTREE
 #
 # 3.10 introduced remove_proc_subtree
@@ -1769,6 +1820,23 @@ vfs_unlink_3args, [
 ])
 ]) # LC_VFS_UNLINK_3ARGS
 
+# LC_HAVE_D_IS_POSITIVE
+#
+# Kernel version 3.13 b18825a7c8e37a7cf6abb97a12a6ad71af160de7
+# d_is_positive is added
+#
+AC_DEFUN([LC_HAVE_D_IS_POSITIVE], [
+LB_CHECK_COMPILE([if 'd_is_positive' exist],
+d_is_positive, [
+       #include <linux/dcache.h>
+],[
+       d_is_positive(NULL);
+],[
+       AC_DEFINE(HAVE_D_IS_POSITIVE, 1,
+               ['d_is_positive' is available])
+])
+]) # LC_HAVE_D_IS_POSITIVE
+
 #
 # LC_HAVE_BVEC_ITER
 #
@@ -3120,6 +3188,7 @@ AC_DEFUN([LC_PROG_LINUX], [
        # 2.6.37
        LC_KERNEL_LOCKED
        LC_FS_STRUCT_SEQCOUNT
+       LC_DENTRY_PATH_RAW
 
        # 2.6.38
        LC_BLKDEV_GET_BY_DEV
@@ -3135,6 +3204,7 @@ AC_DEFUN([LC_PROG_LINUX], [
        LC_HAVE_FSTYPE_MOUNT
        LC_HAVE_INODE_OWNER_OR_CAPABLE
        LC_HAVE_SECURITY_IINITSEC
+       LC_VFS_CREATE_USE_NAMEIDATA
 
        # 3.0
        LC_DIRTY_INODE_WITH_FLAG
@@ -3186,6 +3256,7 @@ AC_DEFUN([LC_PROG_LINUX], [
        LC_HAVE_HLIST_FOR_EACH_3ARG
        LC_HAVE_BIO_END_SECTOR
        LC_HAVE_IS_SXID
+       LC_HAVE_VFS_GETATTR_2ARGS
 
        # 3.10
        LC_HAVE_REMOVE_PROC_SUBTREE
@@ -3212,6 +3283,7 @@ AC_DEFUN([LC_PROG_LINUX], [
        # 3.13
        LC_VFS_RENAME_5ARGS
        LC_VFS_UNLINK_3ARGS
+       LC_HAVE_D_IS_POSITIVE
 
        # 3.14
        LC_HAVE_BVEC_ITER
index b18525c..987b25f 100644 (file)
@@ -53,6 +53,7 @@ MANFILES =                                    \
        lfs-mirror-split.1                      \
        lfs-mirror-verify.1                     \
        lfs-mkdir.1                             \
+       lfs-pcc.1                               \
        lfs-setdirstripe.1                      \
        lfs-setstripe.1                         \
        lfs-setquota.1                          \
diff --git a/lustre/doc/lctl-pcc.8 b/lustre/doc/lctl-pcc.8
new file mode 100644 (file)
index 0000000..45ba840
--- /dev/null
@@ -0,0 +1,48 @@
+.TH lctl-pcc 8 2019-04-15 "Lustre" "configuration Utilities"
+.SH NAME
+lctl pcc commands used to interact with PCC features.
+.SH SYNOPSIS
+.B lctl pcc add \fR<\fImntpath\fR> <\fIpccpath\fR> [\fB--param\fR|\fB-p\fR <\fIparam\fR>]
+.br
+.B lctl pcc del <\fImntpath\fR> <\fIpccpath\fR>
+.br
+.B lctl pcc clear <\fImntpath\fR>
+.br
+.B lctl pcc list <\fImntpath\fR>
+.SH DESCRIPTION
+.TP
+.B lctl pcc add \fR<\fImntpath\fR> <\fIpccpath\fR> [\fB--param\fR|\fB-p\fR <\fIparam\fR>]
+Add a PCC backend specified by HSM root path
+.IR pccpath
+on a Lustre filesystem client instances with the mount point referenced by
+.IR mntpath .
+The parameter
+.IR param
+is a string to config the PCC backend such as read-write attach id (archive ID)
+and auto-caching project id. i.e. the string "2 100" means that the read-write
+attach id is 2, and the project ID is 100 for the PCC backend. On this client
+any subsequently created files with the project ID of 100 will be persistently
+cached automatically.
+.TP
+.B lctl pcc del <\fImntpath\fR> <\fIpccpath\fR>
+Delete a PCC backend specified by path
+.IR pccpath
+on a Lustre client referenced by the mount point of
+.IR mntpath .
+.TP
+.B lctl pcc clear <\fImntpath\fR>
+Remove all PCC backend on a Lustre client referenced by the mount point of
+.IR mntpath .
+.TP
+.B lctl pcc list <\fImntpath\fR>
+List all PCC backends on a Lustre client referenced by the mount point of
+.IR mntpath .
+.SH OPTIONS
+.TP
+.B --param | -p
+Specifies the configuration parameters for a PCC backend.
+.TP
+.SH SEE ALSO
+.BR lfs (1),
+.BR lfs-hsm (1),
+.BR lfs-pcc (1)
index 84e6920..b3be0e8 100644 (file)
@@ -593,4 +593,5 @@ filesystem package.
 .BR lctl-nodemap-del-range (8),
 .BR lctl-nodemap-del (8),
 .BR lctl-nodemap-modify (8),
+.BR lctl-pcc (8),
 .BR lfs (1)
diff --git a/lustre/doc/lfs-pcc.1 b/lustre/doc/lfs-pcc.1
new file mode 100644 (file)
index 0000000..e4746f5
--- /dev/null
@@ -0,0 +1,71 @@
+.TH LFS-PCC 1 2019-04-15 "Lustre" "Lustre Utilities"
+.SH NAME
+lfs pcc commands used to interact with PCC features.
+.SH SYNOPSIS
+.B lfs pcc attach <\fB--id\fR|\fB-i\fR NUM>  <\fIfile \fR...>
+.br
+.B lfs pcc detach <\fIfile \fR...>
+.br
+.B lfs pcc detach_fid <\fImntpath\fR> <\fIfid \fR...>
+.br
+.B lfs pcc state <\fIfile \fR...>
+.SH DESCRIPTION
+.TP
+.B lfs pcc attach <\fB--id\fR|\fB-i\fR NUM>  <\fIfile \fR...>
+Attach given files on the persistent client cache.
+.TP
+.B lfs pcc detach <\fIfile \fR...>
+Detach given files from the persistent client cache.
+.TP
+.B lfs pcc detach_fid <\fImntpath\fR> <\fIfid \fR...>
+Detach files from the persistent client cache by FID(s).
+.TP
+.B lfs pcc state <\fIfile \fR...>
+Display the PCC state for given files.
+.TP
+.SH OPTIONS
+.TP
+.B --id | -i
+For RW-PCC, it is HSM ARCHIVE ID to choose which backend for cache files.
+.TP
+Before using RW-PCC, you need to configure HSM root and Archive ID mapping properly:
+.TP
+.B lfs pcc add $MNTPATH $PCCPATH "$ARCHIVE_ID $PROJID"
+Add one PCC backend to the Lustre client, you need to specify hsm root,
+archive ID, and project ID. On this client any subsequently created
+files with this project ID will be persistently cached automatically.
+.TP
+.B lfs pcc del $MNTPATH $PCCPATH
+ Delete one PCC backend
+.TP
+.B lfs pcc clear $MNTPATH
+ Clear and remove all PCC backends for the client.
+.TP
+.SH EXAMPLES
+.TP
+.B # lctl set_param mdt.$FSNAME-MDT0000.hsm_control=enabled
+Enable HSM on the appropriate MDT.
+.TP
+.B # lhsmtool_posix --daemon --hsm-root /mnt/pcc/ --archive=1 /mnt/lustre
+Launch one copytool on client node to connect cache storage.
+.TP
+.B # lfs pcc add /mnt/lustre /mnt/pcc \ "1\ 1"
+Add HSM root and Archive ID mapping for RW-PCC.
+.TP
+.B $ lfs pcc attach -i 1 /mnt/lustre/file
+Attach an existing file into PCC and migrate data from lustre to Cache Device,
+any I/O to the Lustre file will direct to the RW-PCC copy.
+.TP
+.B $ lfs pcc detach /mnt/lustre/file
+Detach the file from RW-PCC, IO to the file will come to Lustre after this command.
+.TP
+.B $ lfs pcc state /mnt/lustre/file
+.br
+file: /mnt/lustre/file, type: readwrite, PCC file: /mnt/pcc/0004/0000/0bd1/0000/0002/0000/0x200000bd1:0x4:0x0, user number: 1, flags: 6
+.br
+Display the PCC state of the file "/mnt/lustre/file".
+.TP
+.SH SEE ALSO
+.BR lfs (1),
+.BR lfs-hsm (1),
+.BR lctl-pcc (8)
index 5b1177b..d508e0d 100644 (file)
@@ -319,4 +319,5 @@ The lfs command is part of the Lustre filesystem.
 .BR lfs-setdirstripe (1),
 .BR lfs-setquota (1),
 .BR lfs-setstripe (1),
+.BR lfs-pcc (1),
 .BR lustre (7)
diff --git a/lustre/doc/llapi_pcc_attach.3 b/lustre/doc/llapi_pcc_attach.3
new file mode 100644 (file)
index 0000000..87a9202
--- /dev/null
@@ -0,0 +1,62 @@
+.TH llapi_pcc_attach 3 "2019 April 20" "Lustre User API"
+.SH NAME
+llapi_pcc_attach \- attach a file into PCC
+.SH SYNOPSIS
+.nf
+.B #include <lustre/lustreapi.h>
+.PP
+.BI "int llapi_pcc_attach(const char *" path ", __u32 " id ,
+.BI "                     enum lu_pcc_type " type ");"
+.fi
+.SH DESCRIPTION
+.PP
+The function
+.B llapi_pcc_attach()
+tries to attach the file referenced by
+.BR path
+into PCC backend. PCC provides a group of local caches and works in two modes:
+RW-PCC enables a read-write cache on the local SSDs of a single client; RO-PCC
+provides a read-only cache on the local SSDs of multiple clients. For RW-PCC,
+the argument
+.I id
+is the archive ID of the copytool agent running on this client. By default,
+RO-PCC attach ID is setting same with RW-PCC attach ID for a PCC backend if it
+is also used as read-only caching. The attach mode is specified by
+.I type
+argument, which is a
+.B enum lu_pcc_type
+data structure, which contains the following values:
+.nf
+.LP
+       LU_PCC_NONE
+       LU_PCC_READWRITE
+       LU_PCC_READONLY
+.fi
+.TP
+LU_PCC_NONE
+menas that the file is not cached on PCC.
+.TP
+LU_PCC_READWRITE
+means RW-PCC mode.
+.TP
+LU_PCC_READONLY
+means RO-PCC mode.
+.SH RETURN VALUES
+.PP
+.B llapi_pcc_attach()
+return 0 on success or a negative errno value on failure.
+.SH ERRORS
+.TP 15
+.SM -ENOMEM
+Insufficient memory to complete operation.
+.TP
+.SM -EFAULT
+Memory region is not properly mapped.
+.TP
+.SM -EINVAL
+One or more invalid arguments are given.
+.TP
+.SM -EOPNOTSUPP
+PCC attach operation is not supported.
+.SH "SEE ALSO"
+.BR lustreapi (7)
diff --git a/lustre/doc/llapi_pcc_detach_fid.3 b/lustre/doc/llapi_pcc_detach_fid.3
new file mode 100644 (file)
index 0000000..1330fa6
--- /dev/null
@@ -0,0 +1 @@
+.so man3/llapi_pcc_detach_fid_fd.3
diff --git a/lustre/doc/llapi_pcc_detach_fid_fd.3 b/lustre/doc/llapi_pcc_detach_fid_fd.3
new file mode 100644 (file)
index 0000000..0afe243
--- /dev/null
@@ -0,0 +1,65 @@
+.TH llapi_pcc_detach_fid_fd 3 "2019 April 20" "Lustre User API"
+.SH NAME
+llapi_pcc_detach_fid_fd, llapi_pcc_detach_fid, llapi_pcc_detach_fid_str,
+llapi_pcc_detach_file \- detach the given file from PCC
+.SH SYNOPSIS
+.nf
+.B #include <lustre/lustreapi.h>
+.PP
+.BI "int llapi_pcc_detach_fid_fd(int " dirfd ", const struct lu_fid *" fid ");"
+.PP
+.BI "int llapi_pcc_detach_fid(const char *" mntpath ", const struct lu_fid *" fid ");"
+.PP
+.BI "int llapi_pcc_detach_fid_str(const char *" mntpath ", const char *" fidstr ");"
+.PP
+.BI "int llapi_pcc_detach_file(const char *" path ");"
+.fi
+.SH DESCRIPTION
+.PP
+.BR llapi_pcc_detach_fid_fd() ,
+.BR llapi_pcc_detach_fid() ,
+.BR llapi_pcc_detach_fid_str() ,
+and
+.BR llapi_pcc_detach_file()
+detaches a cached file from PCC by an ioctl on the dir. The file is referenced
+by
+.IR fid ,
+.IR fidstr ,
+or
+.IR path .
+The dir, which usually a mount point dir that the copytool already has opened,
+is referenced by
+.IR dirfd ,
+.IR mntpath ,
+.IR path .
+.SH RETURN VALUES
+.LP
+.BR llapi_pcc_detach_fid_fd() ,
+.BR llapi_pcc_detach_fid() ,
+.BR llapi_pcc_detach_fid_str() ,
+and
+.B llapi_pcc_detach_file()
+return 0 on success or a negative errno value on failure.
+.SH ERRORS
+.TP 15
+.SM -ENOMEM
+Insufficient memory to complete operation.
+.TP
+.SM -EFAULT
+Memory region is not properly mapped.
+.TP
+.SM -EINVAL
+One or more invalid arguments are given.
+.TP
+.SM -EOPNOTSUPP
+PCC state operation is not supported.
+.TP
+.SM -ENOTTY
+File does not reside on a Lustre filesystem.
+.TP
+.SM -ENOENT
+.I path
+does not exist.
+.SH "SEE ALSO"
+.BR llapi_pcc_attach (3),
+.BR lustreapi (7)
diff --git a/lustre/doc/llapi_pcc_detach_fid_str.3 b/lustre/doc/llapi_pcc_detach_fid_str.3
new file mode 100644 (file)
index 0000000..1330fa6
--- /dev/null
@@ -0,0 +1 @@
+.so man3/llapi_pcc_detach_fid_fd.3
diff --git a/lustre/doc/llapi_pcc_detach_file.3 b/lustre/doc/llapi_pcc_detach_file.3
new file mode 100644 (file)
index 0000000..1330fa6
--- /dev/null
@@ -0,0 +1 @@
+.so man3/llapi_pcc_detach_fid_fd.3
diff --git a/lustre/doc/llapi_pcc_state_get.3 b/lustre/doc/llapi_pcc_state_get.3
new file mode 100644 (file)
index 0000000..2d2a9d2
--- /dev/null
@@ -0,0 +1,73 @@
+.TH llapi_pcc_state_get 3 "2019 April 20" "Lustre User API"
+.SH NAME
+llapi_pcc_state_get, llapi_pcc_state_get_fd, \- get the current PCC state
+related to a file
+.SH SYNOPSIS
+.nf
+.B #include <lustre/lustreapi.h>
+.PP
+.BI "int llapi_pcc_state_get(const char *" path ", struct lu_pcc_state *" state ");"
+.PP
+.BI "int llapi_pcc_state_get_fd(int " fd ", struct lu_pcc_state *" state ");"
+.fi
+.SH DESCRIPTION
+.PP
+The function
+.BR llapi_pcc_state_get()
+and
+.BR llapi_pcc_state_get_fd()
+returns the PCC state information for the file referenced by
+.IR path
+or
+.IR fd .
+Information is returned in the
+.IR state
+argument which should be already allocated, which is a
+.B lu_pcc_state
+data structure, which contains the following fields:
+.nf
+.LP
+struct lu_pcc_state {
+       __u32   pccs_type; /* enum lu_pcc_type */
+       __u32   pccs_open_count;
+       __u32   pccs_flags; /* enum lu_pcc_state_flags */
+       __u32   pccs_padding;
+       char    pccs_path[PATH_MAX];
+};
+.fi
+.TP
+.I pccs_type
+specifies the PCC mode for the given file, which is actual an
+.B lu_pcc_type
+data structure.
+.TP
+.I pccs_open_count
+indicates the opener count for the given file on the client.
+.TP
+.I pccs_flags
+is PCC flags for the given file,  not used currently.
+.TP
+.I pccs_path
+is the full path of the cached file on the PCC backend.
+.SH RETURN VALUES
+.PP
+.B llapi_pcc_state_get()
+and
+.B llapi_pcc_state_get_fd()
+return 0 on success or a negative errno value on failure.
+.SH ERRORS
+.TP 15
+.SM -ENOMEM
+Insufficient memory to complete operation.
+.TP
+.SM -EFAULT
+Memory region is not properly mapped.
+.TP
+.SM -EINVAL
+One or more invalid arguments are given.
+.TP
+.SM -EOPNOTSUPP
+PCC state operation is not supported.
+.SH "SEE ALSO"
+.BR llapi_pcc_attach (3),
+.BR lustreapi (7)
diff --git a/lustre/doc/llapi_pcc_state_get_fd.3 b/lustre/doc/llapi_pcc_state_get_fd.3
new file mode 100644 (file)
index 0000000..da4b39f
--- /dev/null
@@ -0,0 +1 @@
+.so man3/llapi_pcc_state_get.3
diff --git a/lustre/doc/llapi_pccdev_get.3 b/lustre/doc/llapi_pccdev_get.3
new file mode 100644 (file)
index 0000000..b317243
--- /dev/null
@@ -0,0 +1,36 @@
+.TH llapi_pccdev_get 3 "2019 April 20" "Lustre User API"
+.SH NAME
+llapi_pccdev_get \- List all PCC backends on a client
+.SH SYNOPSIS
+.nf
+.B #include <lustre/lustreapi.h>
+.PP
+.BI "int llapi_pccdev_get(const char *" path ");"
+.fi
+.SH DESCRIPTION
+.PP
+The function
+.BR llapi_pccdev_get()
+lists all PCC backends on the client with the mount point referenced by
+.IR path ,
+and output the results to stdout in YAML format.
+.SH RETURN VALUES
+.PP
+.B llapi_pccdev_get()
+return 0 on success or a negative errno value on failure.
+.SH ERRORS
+.TP 15
+.SM -ENOMEM
+Insufficient memory to complete operation.
+.TP
+.SM -EFAULT
+Memory region is not properly mapped.
+.TP
+.SM -EINVAL
+One or more invalid arguments are given.
+.TP
+.SM -EOPNOTSUPP
+PCC backend operation is not supported.
+.SH "SEE ALSO"
+.BR llapi_pccdev_set (3)
+.BR lustreapi (7)
diff --git a/lustre/doc/llapi_pccdev_set.3 b/lustre/doc/llapi_pccdev_set.3
new file mode 100644 (file)
index 0000000..14f1010
--- /dev/null
@@ -0,0 +1,48 @@
+.TH llapi_pccdev_set 3 "2019 April 20" "Lustre User API"
+.SH NAME
+llapi_pccdev_set \- Add/delete a PCC backend on a client
+.SH SYNOPSIS
+.nf
+.B #include <lustre/lustreapi.h>
+.PP
+.BI "int llapi_pccdev_set(const char *" path ", const char *" cmd ");"
+.fi
+.SH DESCRIPTION
+.PP
+The function
+.BR llapi_pccdev_set()
+adds or deletes a PCC backend on the client with the mount point referenced by
+.IR path .
+The input argument
+.IR cmd
+could be in the following forms:
+.TP
+.B \ "add\ $PCCPATH\ $PARAM"
+Add a PCC backend referenced by the HSM root path
+.IR $PCCPATH .
+.TP
+.B \ "del\ $PCCPATH"
+Delete a PCC backend referenced by the HSM root path
+.IR $PCCPATH .
+.TP
+.B \ "clear"
+Clear and remove all PCC backends on a client.
+.SH RETURN VALUES
+.PP
+.B llapi_pccdev_set()
+return 0 on success or a negative errno value on failure.
+.SH ERRORS
+.TP 15
+.SM -ENOMEM
+Insufficient memory to complete operation.
+.TP
+.SM -EFAULT
+Memory region is not properly mapped.
+.TP
+.SM -EINVAL
+One or more invalid arguments are given.
+.TP
+.SM -EOPNOTSUPP
+PCC backend operation is not supported.
+.SH "SEE ALSO"
+.BR lustreapi (7)
index 2ae985c..9b58d53 100644 (file)
@@ -518,6 +518,17 @@ int llapi_group_unlock(int fd, int gid);
 /* Ladvise */
 int llapi_ladvise(int fd, unsigned long long flags, int num_advise,
                  struct llapi_lu_ladvise *ladvise);
+
+/* PCC */
+int llapi_pcc_attach(const char *path, __u32 id, enum lu_pcc_type type);
+int llapi_pcc_detach_fid_fd(int fd, const struct lu_fid *fid);
+int llapi_pcc_detach_fid(const char *mntpath, const struct lu_fid *fid);
+int llapi_pcc_detach_fid_str(const char *mntpath, const char *fidstr);
+int llapi_pcc_detach_file(const char *path);
+int llapi_pcc_state_get_fd(int fd, struct lu_pcc_state *state);
+int llapi_pcc_state_get(const char *path, struct lu_pcc_state *state);
+int llapi_pccdev_set(const char *mntpath, const char *cmd);
+int llapi_pccdev_get(const char *mntpath);
 /** @} llapi */
 
 /* llapi_layout user interface */
index 1b74aa4..284f4ee 100644 (file)
@@ -378,6 +378,37 @@ static inline struct inode *file_inode(const struct file *file)
 #define ll_vfs_unlink(a, b) vfs_unlink(a, b)
 #endif
 
+#ifndef HAVE_INODE_OWNER_OR_CAPABLE
+#define inode_owner_or_capable(inode) is_owner_or_cap(inode)
+#endif
+
+static inline int ll_vfs_getattr(struct path *path, struct kstat *st)
+{
+       int rc;
+
+#ifdef HAVE_INODEOPS_ENHANCED_GETATTR
+       rc = vfs_getattr(path, st, STATX_BASIC_STATS, AT_STATX_SYNC_AS_STAT);
+#elif defined HAVE_VFS_GETATTR_2ARGS
+       rc = vfs_getattr(path, st);
+#else
+       rc = vfs_getattr(path->mnt, path->dentry, st);
+#endif
+       return rc;
+}
+
+#ifndef HAVE_D_IS_POSITIVE
+static inline bool d_is_positive(const struct dentry *dentry)
+{
+       return dentry->d_inode != NULL;
+}
+#endif
+
+#ifdef HAVE_VFS_CREATE_USE_NAMEIDATA
+# define LL_VFS_CREATE_FALSE NULL
+#else
+# define LL_VFS_CREATE_FALSE false
+#endif
+
 #ifndef HAVE_INODE_LOCK
 # define inode_lock(inode) mutex_lock(&(inode)->i_mutex)
 # define inode_unlock(inode) mutex_unlock(&(inode)->i_mutex)
index fb8c67f..cd74df3 100644 (file)
@@ -156,6 +156,9 @@ struct md_op_spec {
        void            *sp_cr_file_secctx; /* xattr value */
        size_t           sp_cr_file_secctx_size; /* xattr value size */
 
+       /* Archive ID used for auto PCC attach when create newly files. */
+       __u32            sp_archive_id;
+
        /** don't create lov objects or llog cookie - this replay */
        unsigned int no_create:1,
                     sp_cr_lookup:1, /* do lookup sanity check or not. */
index b25c9d4..7f3c890 100644 (file)
@@ -917,6 +917,8 @@ struct md_op_data {
        bool                    op_post_migrate;
        /* used to access dir with bash hash */
        __u32                   op_stripe_index;
+       /* Archive ID for PCC attach */
+       __u32                   op_archive_id;
 };
 
 struct md_callback {
index 9557039..b0092c2 100644 (file)
@@ -898,7 +898,8 @@ struct ptlrpc_body_v2 {
                                OBD_CONNECT2_ARCHIVE_ID_ARRAY | \
                                OBD_CONNECT2_SELINUX_POLICY | \
                                OBD_CONNECT2_LSOM | \
-                               OBD_CONNECT2_ASYNC_DISCARD)
+                               OBD_CONNECT2_ASYNC_DISCARD | \
+                               OBD_CONNECT2_PCC)
 
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
@@ -1952,6 +1953,7 @@ enum mds_op_bias {
        MDS_CLOSE_RESYNC_DONE   = 1 << 16,
        MDS_CLOSE_LAYOUT_SPLIT  = 1 << 17,
        MDS_TRUNC_KEEP_LEASE    = 1 << 18,
+       MDS_PCC_ATTACH          = 1 << 19,
 };
 
 #define MDS_CLOSE_INTENT (MDS_HSM_RELEASE | MDS_CLOSE_LAYOUT_SWAP |         \
@@ -1974,7 +1976,10 @@ struct mdt_rec_create {
        struct lu_fid   cr_fid2;
        struct lustre_handle cr_open_handle_old; /* in case of open replay */
        __s64           cr_time;
-       __u64           cr_rdev;
+       union {
+               __u64           cr_rdev;
+               __u32           cr_archive_id;
+       };
        __u64           cr_ioepoch;
        __u64           cr_padding_1;   /* rr_blocks */
        __u32           cr_mode;
@@ -3545,6 +3550,8 @@ struct close_data {
                struct close_data_resync_done   cd_resync;
                /* split close */
                __u16                           cd_mirror_id;
+               /* PCC release */
+               __u32                           cd_archive_id;
        };
 };
 
index 0bad5c7..d9567f7 100644 (file)
@@ -393,6 +393,7 @@ enum ll_lease_flags {
        LL_LEASE_RESYNC_DONE    = 0x2,
        LL_LEASE_LAYOUT_MERGE   = 0x4,
        LL_LEASE_LAYOUT_SPLIT   = 0x8,
+       LL_LEASE_PCC_ATTACH     = 0x10,
 };
 
 #define IOC_IDS_MAX    4096
@@ -481,6 +482,8 @@ struct ll_ioc_lease_id {
 #define LL_IOC_LADVISE                 _IOR('f', 250, struct llapi_lu_ladvise)
 #define LL_IOC_HEAT_GET                        _IOWR('f', 251, struct lu_heat)
 #define LL_IOC_HEAT_SET                        _IOW('f', 251, __u64)
+#define LL_IOC_PCC_DETACH              _IOW('f', 252, struct lu_pcc_detach)
+#define LL_IOC_PCC_STATE               _IOR('f', 252, struct lu_pcc_state)
 
 #ifndef        FS_IOC_FSGETXATTR
 /*
@@ -1214,12 +1217,15 @@ enum la_valid {
 #define MDS_OPEN_RELEASE   02000000000000ULL /* Open the file for HSM release */
 
 #define MDS_OPEN_RESYNC    04000000000000ULL /* FLR: file resync */
+#define MDS_OPEN_PCC      010000000000000ULL /* PCC: auto RW-PCC cache attach
+                                             * for newly created file */
 
 /* lustre internal open flags, which should not be set from user space */
 #define MDS_OPEN_FL_INTERNAL (MDS_OPEN_HAS_EA | MDS_OPEN_HAS_OBJS |    \
                              MDS_OPEN_OWNEROVERRIDE | MDS_OPEN_LOCK |  \
                              MDS_OPEN_BY_FID | MDS_OPEN_LEASE |        \
-                             MDS_OPEN_RELEASE | MDS_OPEN_RESYNC)
+                             MDS_OPEN_RELEASE | MDS_OPEN_RESYNC |      \
+                             MDS_OPEN_PCC)
 
 
 /********* Changelogs **********/
@@ -2294,6 +2300,47 @@ struct lu_heat {
        __u64 lh_heat[0];
 };
 
+enum lu_pcc_type {
+       LU_PCC_NONE = 0,
+       LU_PCC_READWRITE,
+       LU_PCC_MAX
+};
+
+static inline const char *pcc_type2string(enum lu_pcc_type type)
+{
+       switch (type) {
+       case LU_PCC_NONE:
+               return "none";
+       case LU_PCC_READWRITE:
+               return "readwrite";
+       default:
+               return "fault";
+       }
+}
+
+struct lu_pcc_attach {
+       __u32 pcca_type; /* PCC type */
+       __u32 pcca_id; /* archive ID for readwrite, group ID for readonly */
+};
+
+struct lu_pcc_detach {
+       /* fid of the file to detach */
+       struct lu_fid   pccd_fid;
+};
+
+enum lu_pcc_state_flags {
+       /* Whether the inode attr is cached locally */
+       PCC_STATE_FLAG_ATTR_VALID       = 0x1,
+};
+
+struct lu_pcc_state {
+       __u32   pccs_type; /* enum lu_pcc_type */
+       __u32   pccs_open_count;
+       __u32   pccs_flags; /* enum lu_pcc_state_flags */
+       __u32   pccs_padding;
+       char    pccs_path[PATH_MAX];
+};
+
 #if defined(__cplusplus)
 }
 #endif
index a2414bd..98dd203 100644 (file)
@@ -9,10 +9,10 @@ lustre-objs += glimpse.o
 lustre-objs += lcommon_cl.o
 lustre-objs += lcommon_misc.o
 lustre-objs += vvp_dev.o vvp_page.o vvp_io.o vvp_object.o
-lustre-objs += range_lock.o
+lustre-objs += range_lock.o pcc.o
 
 EXTRA_DIST := $(lustre-objs:.o=.c) llite_internal.h rw26.c super25.c
-EXTRA_DIST += vvp_internal.h range_lock.h
+EXTRA_DIST += vvp_internal.h range_lock.h pcc.h
 
 @XATTR_HANDLER_TRUE@EXTRA_DIST += xattr26.c
 @XATTR_HANDLER_FALSE@EXTRA_DIST += xattr.c
index 914fff0..c70cd20 100644 (file)
@@ -1983,6 +1983,72 @@ migrate_free:
                RETURN(ll_ioctl_fsgetxattr(inode, cmd, arg));
        case LL_IOC_FSSETXATTR:
                RETURN(ll_ioctl_fssetxattr(inode, cmd, arg));
+       case LL_IOC_PCC_DETACH: {
+               struct lu_pcc_detach *detach;
+               struct lu_fid *fid;
+               struct inode *inode2;
+               unsigned long ino;
+
+               /*
+                * The reason why a dir IOCTL is used to detach a PCC-cached
+                * file rather than making it a file IOCTL is:
+                * When PCC caching a file, it will attach the file firstly,
+                * and increase the refcount of PCC inode (pcci->pcci_refcount)
+                * from 0 to 1.
+                * When detaching a PCC-cached file, it will check whether the
+                * refcount is 1. If so, the file can be detached successfully.
+                * Otherwise, it means there are some users opened and using
+                * the file currently, and it will return -EBUSY.
+                * Each open on the PCC-cached file will increase the refcount
+                * of the PCC inode;
+                * Each close on the PCC-cached file will decrease the refcount
+                * of the PCC inode;
+                * When used a file IOCTL to detach a PCC-cached file, it needs
+                * to open it at first, which will increase the refcount. So
+                * during the process of the detach IOCTL, it will return
+                * -EBUSY as the PCC inode refcount is larger than 1. Someone
+                * might argue that here it can just decrease the refcount
+                * of the PCC inode, return succeed and make the close of
+                * IOCTL file handle to perform the real detach. But this
+                * may result in inconsistent state of a PCC file. i.e. Process
+                * A got a successful return form the detach IOCTL; Process B
+                * opens the file before Process A finally closed the IOCTL
+                * file handle. It makes the following I/O of Process B will
+                * direct into PCC although the file was already detached from
+                * the view of Process A.
+                * Using a dir IOCTL does not exist the problem above.
+                */
+               OBD_ALLOC_PTR(detach);
+               if (detach == NULL)
+                       RETURN(-ENOMEM);
+
+               if (copy_from_user(detach,
+                                  (const struct lu_pcc_detach __user *)arg,
+                                  sizeof(*detach)))
+                       GOTO(out_detach, rc = -EFAULT);
+
+               fid = &detach->pccd_fid;
+               ino = cl_fid_build_ino(fid, ll_need_32bit_api(sbi));
+               inode2 = ilookup5(inode->i_sb, ino, ll_test_inode_by_fid, fid);
+               if (inode2 == NULL)
+                       /* Target inode is not in inode cache, and PCC file
+                        * has aleady released, return immdiately.
+                        */
+                       GOTO(out_detach, rc = 0);
+
+               if (!S_ISREG(inode2->i_mode))
+                       GOTO(out_iput, rc = -EINVAL);
+
+               if (!inode_owner_or_capable(inode2))
+                       GOTO(out_iput, rc = -EPERM);
+
+               rc = pcc_ioctl_detach(inode2);
+out_iput:
+               iput(inode2);
+out_detach:
+               OBD_FREE_PTR(detach);
+               RETURN(rc);
+       }
        default:
                RETURN(obd_iocontrol(cmd, sbi->ll_dt_exp, 0, NULL,
                                     (void __user *)arg));
index 6f26724..e340c71 100644 (file)
@@ -58,6 +58,11 @@ struct split_param {
        __u16           sp_mirror_id;
 };
 
+struct pcc_param {
+       __u64   pa_data_version;
+       __u32   pa_archive_id;
+};
+
 static int
 ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg);
 
@@ -73,6 +78,7 @@ static struct ll_file_data *ll_file_data_get(void)
                return NULL;
 
        fd->fd_write_failed = false;
+       pcc_file_init(&fd->fd_pcc_file);
 
        return fd;
 }
@@ -190,6 +196,17 @@ static int ll_close_inode_openhandle(struct inode *inode,
                break;
        }
 
+       case MDS_PCC_ATTACH: {
+               struct pcc_param *param = data;
+
+               LASSERT(data != NULL);
+               op_data->op_bias |= MDS_HSM_RELEASE | MDS_PCC_ATTACH;
+               op_data->op_archive_id = param->pa_archive_id;
+               op_data->op_data_version = param->pa_data_version;
+               op_data->op_lease_handle = och->och_lease_handle;
+               break;
+       }
+
        case MDS_HSM_RELEASE:
                LASSERT(data != NULL);
                op_data->op_bias |= MDS_HSM_RELEASE;
@@ -372,6 +389,8 @@ int ll_file_release(struct inode *inode, struct file *file)
                RETURN(0);
        }
 
+       pcc_file_release(inode, file);
+
        if (!S_ISDIR(inode->i_mode)) {
                if (lli->lli_clob != NULL)
                        lov_read_and_clear_async_rc(lli->lli_clob);
@@ -813,6 +832,10 @@ restart:
                if (rc)
                        GOTO(out_och_free, rc);
        }
+       rc = pcc_file_open(inode, file);
+       if (rc)
+               GOTO(out_och_free, rc);
+
        mutex_unlock(&lli->lli_och_mutex);
         fd = NULL;
 
@@ -837,6 +860,7 @@ out_och_free:
 out_openerr:
                if (lli->lli_opendir_key == fd)
                        ll_deauthorize_statahead(inode, fd);
+
                if (fd != NULL)
                        ll_file_data_put(fd);
         } else {
@@ -1617,6 +1641,22 @@ static ssize_t ll_file_read_iter(struct kiocb *iocb, struct iov_iter *to)
        ssize_t result;
        ssize_t rc2;
        __u16 refcheck;
+       bool cached = false;
+
+       /**
+        * Currently when PCC read failed, we do not fall back to the
+        * normal read path, just return the error.
+        * The resaon is that: for RW-PCC, the file data may be modified
+        * in the PCC and inconsistent with the data on OSTs (or file
+        * data has been removed from the Lustre file system), at this
+        * time, fallback to the normal read path may read the wrong
+        * data.
+        * TODO: for RO-PCC (readonly PCC), fall back to normal read
+        * path: read data from data copy on OSTs.
+        */
+       result = pcc_file_read_iter(iocb, to, &cached);
+       if (cached)
+               return result;
 
        ll_ras_enter(iocb->ki_filp);
 
@@ -1712,9 +1752,24 @@ static ssize_t ll_file_write_iter(struct kiocb *iocb, struct iov_iter *from)
        struct lu_env *env;
        ssize_t rc_tiny = 0, rc_normal;
        __u16 refcheck;
+       bool cached = false;
+       int result;
 
        ENTRY;
 
+       /**
+        * When PCC write failed, we do not fall back to the normal
+        * write path, just return the error. The reason is that:
+        * PCC is actually a HSM device, and HSM does not handle the
+        * failure especially -ENOSPC due to space used out; Moreover,
+        * the fallback to normal I/O path for ENOSPC failure, needs
+        * to restore the file data to OSTs first and redo the write
+        * again, making the logic of PCC very complex.
+        */
+       result = pcc_file_write_iter(iocb, from, &cached);
+       if (cached)
+               return result;
+
        /* NB: we can't do direct IO for tiny writes because they use the page
         * cache, we can't do sync writes because tiny writes can't flush
         * pages, and we can't do append writes because we can't guarantee the
@@ -1895,8 +1950,16 @@ static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
         struct vvp_io_args *args;
         ssize_t             result;
        __u16               refcheck;
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+
         ENTRY;
 
+       /* pcc cache path */
+       if (pcc_file && file_inode(pcc_file)->i_fop->splice_read)
+               return file_inode(pcc_file)->i_fop->splice_read(pcc_file,
+                                               ppos, pipe, count, flags);
+
        ll_ras_enter(in_file);
 
         env = cl_env_get(&refcheck);
@@ -3105,13 +3168,16 @@ static long ll_file_unlock_lease(struct file *file, struct ll_ioc_lease *ioc,
        struct ll_inode_info    *lli = ll_i2info(inode);
        struct obd_client_handle *och = NULL;
        struct split_param sp;
-       bool lease_broken;
+       struct pcc_param param;
+       bool lease_broken = false;
        fmode_t fmode = 0;
        enum mds_op_bias bias = 0;
        struct file *layout_file = NULL;
        void *data = NULL;
        size_t data_size = 0;
-       long rc;
+       bool attached = false;
+       long rc, rc2 = 0;
+
        ENTRY;
 
        mutex_lock(&lli->lli_och_mutex);
@@ -3122,22 +3188,22 @@ static long ll_file_unlock_lease(struct file *file, struct ll_ioc_lease *ioc,
        mutex_unlock(&lli->lli_och_mutex);
 
        if (och == NULL)
-               GOTO(out, rc = -ENOLCK);
+               RETURN(-ENOLCK);
 
        fmode = och->och_flags;
 
        switch (ioc->lil_flags) {
        case LL_LEASE_RESYNC_DONE:
                if (ioc->lil_count > IOC_IDS_MAX)
-                       GOTO(out, rc = -EINVAL);
+                       GOTO(out_lease_close, rc = -EINVAL);
 
                data_size = offsetof(typeof(*ioc), lil_ids[ioc->lil_count]);
                OBD_ALLOC(data, data_size);
                if (!data)
-                       GOTO(out, rc = -ENOMEM);
+                       GOTO(out_lease_close, rc = -ENOMEM);
 
                if (copy_from_user(data, (void __user *)arg, data_size))
-                       GOTO(out, rc = -EFAULT);
+                       GOTO(out_lease_close, rc = -EFAULT);
 
                bias = MDS_CLOSE_RESYNC_DONE;
                break;
@@ -3145,19 +3211,19 @@ static long ll_file_unlock_lease(struct file *file, struct ll_ioc_lease *ioc,
                int fd;
 
                if (ioc->lil_count != 1)
-                       GOTO(out, rc = -EINVAL);
+                       GOTO(out_lease_close, rc = -EINVAL);
 
                arg += sizeof(*ioc);
                if (copy_from_user(&fd, (void __user *)arg, sizeof(__u32)))
-                       GOTO(out, rc = -EFAULT);
+                       GOTO(out_lease_close, rc = -EFAULT);
 
                layout_file = fget(fd);
                if (!layout_file)
-                       GOTO(out, rc = -EBADF);
+                       GOTO(out_lease_close, rc = -EBADF);
 
                if ((file->f_flags & O_ACCMODE) == O_RDONLY ||
                                (layout_file->f_flags & O_ACCMODE) == O_RDONLY)
-                       GOTO(out, rc = -EPERM);
+                       GOTO(out_lease_close, rc = -EPERM);
 
                data = file_inode(layout_file);
                bias = MDS_CLOSE_LAYOUT_MERGE;
@@ -3168,20 +3234,20 @@ static long ll_file_unlock_lease(struct file *file, struct ll_ioc_lease *ioc,
                int mirror_id;
 
                if (ioc->lil_count != 2)
-                       GOTO(out, rc = -EINVAL);
+                       GOTO(out_lease_close, rc = -EINVAL);
 
                arg += sizeof(*ioc);
                if (copy_from_user(&fdv, (void __user *)arg, sizeof(__u32)))
-                       GOTO(out, rc = -EFAULT);
+                       GOTO(out_lease_close, rc = -EFAULT);
 
                arg += sizeof(__u32);
                if (copy_from_user(&mirror_id, (void __user *)arg,
                                   sizeof(__u32)))
-                       GOTO(out, rc = -EFAULT);
+                       GOTO(out_lease_close, rc = -EFAULT);
 
                layout_file = fget(fdv);
                if (!layout_file)
-                       GOTO(out, rc = -EBADF);
+                       GOTO(out_lease_close, rc = -EBADF);
 
                sp.sp_inode = file_inode(layout_file);
                sp.sp_mirror_id = (__u16)mirror_id;
@@ -3189,11 +3255,35 @@ static long ll_file_unlock_lease(struct file *file, struct ll_ioc_lease *ioc,
                bias = MDS_CLOSE_LAYOUT_SPLIT;
                break;
        }
+       case LL_LEASE_PCC_ATTACH:
+               if (ioc->lil_count != 1)
+                       RETURN(-EINVAL);
+
+               arg += sizeof(*ioc);
+               if (copy_from_user(&param.pa_archive_id, (void __user *)arg,
+                                  sizeof(__u32)))
+                       GOTO(out_lease_close, rc2 = -EFAULT);
+
+               rc2 = pcc_readwrite_attach(file, inode, param.pa_archive_id);
+               if (rc2)
+                       GOTO(out_lease_close, rc2);
+
+               attached = true;
+               /* Grab latest data version */
+               rc2 = ll_data_version(inode, &param.pa_data_version,
+                                    LL_DV_WR_FLUSH);
+               if (rc2)
+                       GOTO(out_lease_close, rc2);
+
+               data = &param;
+               bias = MDS_PCC_ATTACH;
+               break;
        default:
                /* without close intent */
                break;
        }
 
+out_lease_close:
        rc = ll_lease_close_intent(och, inode, &lease_broken, bias, data);
        if (rc < 0)
                GOTO(out, rc);
@@ -3217,6 +3307,12 @@ out:
                if (layout_file)
                        fput(layout_file);
                break;
+       case LL_LEASE_PCC_ATTACH:
+               if (!rc)
+                       rc = rc2;
+               rc = pcc_readwrite_attach_fini(file, inode, lease_broken,
+                                              rc, attached);
+               break;
        }
 
        if (!rc)
@@ -3740,6 +3836,29 @@ out_ladvise:
                rc = ll_heat_set(inode, flags);
                RETURN(rc);
        }
+       case LL_IOC_PCC_STATE: {
+               struct lu_pcc_state __user *ustate =
+                       (struct lu_pcc_state __user *)arg;
+               struct lu_pcc_state *state;
+
+               OBD_ALLOC_PTR(state);
+               if (state == NULL)
+                       RETURN(-ENOMEM);
+
+               if (copy_from_user(state, ustate, sizeof(*state)))
+                       GOTO(out_state, rc = -EFAULT);
+
+               rc = pcc_ioctl_state(inode, state);
+               if (rc)
+                       GOTO(out_state, rc);
+
+               if (copy_to_user(ustate, state, sizeof(*state)))
+                       GOTO(out_state, rc = -EFAULT);
+
+out_state:
+               OBD_FREE_PTR(state);
+               RETURN(rc);
+       }
        default:
                RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
                                     (void __user *)arg));
@@ -3936,7 +4055,9 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
 #endif
        struct inode *inode = dentry->d_inode;
        struct ll_inode_info *lli = ll_i2info(inode);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
        struct ptlrpc_request *req;
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
        int rc, err;
        ENTRY;
 
@@ -3944,6 +4065,19 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
               PFID(ll_inode2fid(inode)), inode);
        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
 
+       /* pcc cache path */
+       if (pcc_file)
+#ifdef HAVE_FILE_FSYNC_4ARGS
+               return file_inode(pcc_file)->i_fop->fsync(pcc_file,
+                                       start, end, datasync);
+#elif defined(HAVE_FILE_FSYNC_2ARGS)
+               return file_inode(pcc_file)->i_fop->fsync(pcc_file,
+                                       datasync);
+#else
+               return file_inode(pcc_file)->i_fop->fsync(pcc_file,
+                                       dentry, datasync);
+#endif
+
 #ifdef HAVE_FILE_FSYNC_4ARGS
        rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
        inode_lock(inode);
@@ -4503,27 +4637,8 @@ static int ll_merge_md_attr(struct inode *inode)
        RETURN(0);
 }
 
-static inline dev_t ll_compat_encode_dev(dev_t dev)
-{
-       /* The compat_sys_*stat*() syscalls will fail unless the
-        * device majors and minors are both less than 256. Note that
-        * the value returned here will be passed through
-        * old_encode_dev() in cp_compat_stat(). And so we are not
-        * trying to return a valid compat (u16) device number, just
-        * one that will pass the old_valid_dev() check. */
-
-       return MKDEV(MAJOR(dev) & 0xff, MINOR(dev) & 0xff);
-}
-
-#ifdef HAVE_INODEOPS_ENHANCED_GETATTR
-int ll_getattr(const struct path *path, struct kstat *stat,
-              u32 request_mask, unsigned int flags)
-{
-       struct dentry *de = path->dentry;
-#else
-int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
+int ll_getattr_dentry(struct dentry *de, struct kstat *stat)
 {
-#endif
        struct inode *inode = de->d_inode;
        struct ll_sb_info *sbi = ll_i2sbi(inode);
        struct ll_inode_info *lli = ll_i2info(inode);
@@ -4536,6 +4651,11 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
                RETURN(rc);
 
        if (S_ISREG(inode->i_mode)) {
+               bool cached = false;
+
+               rc = pcc_inode_getattr(inode, &cached);
+               if (cached && rc < 0)
+                       RETURN(rc);
                /* In case of restore, the MDT has the right size and has
                 * already send it back without granting the layout lock,
                 * inode is up-to-date so glimpse is useless.
@@ -4543,7 +4663,7 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
                 * restore the MDT holds the layout lock so the glimpse will
                 * block up to the end of restore (getattr will block)
                 */
-               if (!ll_file_test_flag(lli, LLIF_FILE_RESTORING)) {
+               if (!cached && !ll_file_test_flag(lli, LLIF_FILE_RESTORING)) {
                        rc = ll_glimpse_size(inode);
                        if (rc < 0)
                                RETURN(rc);
@@ -4589,6 +4709,18 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
         return 0;
 }
 
+#ifdef HAVE_INODEOPS_ENHANCED_GETATTR
+int ll_getattr(const struct path *path, struct kstat *stat,
+              u32 request_mask, unsigned int flags)
+{
+       struct dentry *de = path->dentry;
+#else
+int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
+{
+#endif
+       return ll_getattr_dentry(de, stat);
+}
+
 static int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
                     __u64 start, __u64 len)
 {
index 1315c5f..5f7aea2 100644 (file)
@@ -48,6 +48,7 @@
 
 #include "vvp_internal.h"
 #include "range_lock.h"
+#include "pcc.h"
 
 #ifndef FMODE_EXEC
 #define FMODE_EXEC 0
@@ -213,6 +214,9 @@ struct ll_inode_info {
                         * accurate if the file is shared by different jobs.
                         */
                        char                    lli_jobid[LUSTRE_JOBID_SIZE];
+
+                       struct mutex             lli_pcc_lock;
+                       struct pcc_inode        *lli_pcc_inode;
                };
        };
 
@@ -343,6 +347,11 @@ static inline struct ll_inode_info *ll_i2info(struct inode *inode)
        return container_of(inode, struct ll_inode_info, lli_vfs_inode);
 }
 
+static inline struct pcc_inode *ll_i2pcci(struct inode *inode)
+{
+       return ll_i2info(inode)->lli_pcc_inode;
+}
+
 /* default to about 64M of readahead on a given system. */
 #define SBI_DEFAULT_READAHEAD_MAX              MiB_TO_PAGES(64UL)
 
@@ -583,6 +592,9 @@ struct ll_sb_info {
 
        /* filesystem fsname */
        char                      ll_fsname[LUSTRE_MAXFSNAME + 1];
+
+       /* Persistent Client Cache */
+       struct pcc_super          ll_pcc_super;
 };
 
 #define SBI_DEFAULT_HEAT_DECAY_WEIGHT  ((80 * 256 + 50) / 100)
@@ -698,6 +710,7 @@ struct ll_file_data {
        /* The layout version when resync starts. Resync I/O should carry this
         * layout version for verification to OST objects */
        __u32 fd_layout_version;
+       struct pcc_file fd_pcc_file;
 };
 
 void llite_tunables_unregister(void);
@@ -882,6 +895,7 @@ int ll_getattr(const struct path *path, struct kstat *stat,
 #else
 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat);
 #endif
+int ll_getattr_dentry(struct dentry *de, struct kstat *stat);
 struct posix_acl *ll_get_acl(struct inode *inode, int type);
 #ifdef HAVE_IOP_SET_ACL
 #ifdef CONFIG_FS_POSIX_ACL
@@ -1477,6 +1491,18 @@ static inline void d_lustre_revalidate(struct dentry *dentry)
        spin_unlock(&dentry->d_lock);
 }
 
+static inline dev_t ll_compat_encode_dev(dev_t dev)
+{
+       /* The compat_sys_*stat*() syscalls will fail unless the
+        * device majors and minors are both less than 256. Note that
+        * the value returned here will be passed through
+        * old_encode_dev() in cp_compat_stat(). And so we are not
+        * trying to return a valid compat (u16) device number, just
+        * one that will pass the old_valid_dev() check. */
+
+       return MKDEV(MAJOR(dev) & 0xff, MINOR(dev) & 0xff);
+}
+
 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf);
 int ll_layout_refresh(struct inode *inode, __u32 *gen);
 int ll_layout_restore(struct inode *inode, loff_t start, __u64 length);
index 91eb088..5853551 100644 (file)
@@ -138,6 +138,7 @@ static struct ll_sb_info *ll_init_sbi(void)
        sbi->ll_squash.rsi_gid = 0;
        INIT_LIST_HEAD(&sbi->ll_squash.rsi_nosquash_nids);
        init_rwsem(&sbi->ll_squash.rsi_sem);
+       pcc_super_init(&sbi->ll_pcc_super);
 
        /* Per-filesystem file heat */
        sbi->ll_heat_decay_weight = SBI_DEFAULT_HEAT_DECAY_WEIGHT;
@@ -157,6 +158,7 @@ static void ll_free_sbi(struct super_block *sb)
                        cl_cache_decref(sbi->ll_cache);
                        sbi->ll_cache = NULL;
                }
+               pcc_super_fini(&sbi->ll_pcc_super);
                OBD_FREE(sbi, sizeof(*sbi));
        }
        EXIT;
@@ -228,7 +230,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                   OBD_CONNECT2_LOCK_CONVERT |
                                   OBD_CONNECT2_ARCHIVE_ID_ARRAY |
                                   OBD_CONNECT2_LSOM |
-                                  OBD_CONNECT2_ASYNC_DISCARD;
+                                  OBD_CONNECT2_ASYNC_DISCARD |
+                                  OBD_CONNECT2_PCC;
 
 #ifdef HAVE_LRU_RESIZE_SUPPORT
         if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
@@ -980,6 +983,8 @@ void ll_lli_init(struct ll_inode_info *lli)
                spin_lock_init(&lli->lli_heat_lock);
                obd_heat_clear(lli->lli_heat_instances, OBD_HEAT_COUNT);
                lli->lli_heat_flags = 0;
+               mutex_init(&lli->lli_pcc_lock);
+               lli->lli_pcc_inode = NULL;
        }
        mutex_init(&lli->lli_layout_mutex);
        memset(lli->lli_jobid, 0, sizeof(lli->lli_jobid));
@@ -1512,17 +1517,20 @@ void ll_clear_inode(struct inode *inode)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
         struct ll_sb_info *sbi = ll_i2sbi(inode);
+
         ENTRY;
 
        CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
               PFID(ll_inode2fid(inode)), inode);
 
-        if (S_ISDIR(inode->i_mode)) {
-                /* these should have been cleared in ll_file_release */
-                LASSERT(lli->lli_opendir_key == NULL);
-                LASSERT(lli->lli_sai == NULL);
-                LASSERT(lli->lli_opendir_pid == 0);
-        }
+       if (S_ISDIR(inode->i_mode)) {
+               /* these should have been cleared in ll_file_release */
+               LASSERT(lli->lli_opendir_key == NULL);
+               LASSERT(lli->lli_sai == NULL);
+               LASSERT(lli->lli_opendir_pid == 0);
+       } else {
+               pcc_inode_free(inode);
+       }
 
        md_null_inode(sbi->ll_md_exp, ll_inode2fid(inode));
 
@@ -1746,14 +1754,28 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr,
        if (attr->ia_valid & (ATTR_SIZE | ATTR_ATIME | ATTR_ATIME_SET |
                              ATTR_MTIME | ATTR_MTIME_SET | ATTR_CTIME) ||
            xvalid & OP_XVALID_CTIME_SET) {
-               /* For truncate and utimes sending attributes to OSTs, setting
-                * mtime/atime to the past will be performed under PW [0:EOF]
-                * extent lock (new_size:EOF for truncate).  It may seem
-                * excessive to send mtime/atime updates to OSTs when not
-                * setting times to past, but it is necessary due to possible
-                * time de-synchronization between MDT inode and OST objects
-                */
-               rc = cl_setattr_ost(lli->lli_clob, attr, xvalid, 0);
+               bool cached = false;
+
+               rc = pcc_inode_setattr(inode, attr, &cached);
+               if (cached) {
+                       if (rc) {
+                               CERROR("%s: PCC inode "DFID" setattr failed: "
+                                      "rc = %d\n",
+                                      ll_i2sbi(inode)->ll_fsname,
+                                      PFID(&lli->lli_fid), rc);
+                               GOTO(out, rc);
+                       }
+               } else {
+                       /* For truncate and utimes sending attributes to OSTs,
+                        * setting mtime/atime to the past will be performed
+                        * under PW [0:EOF] extent lock (new_size:EOF for
+                        * truncate). It may seem excessive to send mtime/atime
+                        * updates to OSTs when not setting times to past, but
+                        * it is necessary due to possible time
+                        * de-synchronization between MDT inode and OST objects
+                        */
+                       rc = cl_setattr_ost(lli->lli_clob, attr, xvalid, 0);
+               }
        }
 
        /* If the file was restored, it needs to set dirty flag.
index 989a53b..65e4af6 100644 (file)
@@ -504,18 +504,27 @@ int ll_file_mmap(struct file *file, struct vm_area_struct * vma)
 {
        struct inode *inode = file_inode(file);
         int rc;
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+
         ENTRY;
 
+       /* pcc cache path */
+       if (pcc_file) {
+               vma->vm_file = pcc_file;
+               return file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
+       }
+
         if (ll_file_nolock(file))
                 RETURN(-EOPNOTSUPP);
 
         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_MAP, 1);
         rc = generic_file_mmap(file, vma);
         if (rc == 0) {
-                vma->vm_ops = &ll_file_vm_ops;
+               vma->vm_ops = &ll_file_vm_ops;
                 vma->vm_ops->open(vma);
                 /* update the inode's size and mtime */
-                rc = ll_glimpse_size(inode);
+               rc = ll_glimpse_size(inode);
         }
 
         RETURN(rc);
index 5030e0f..ed66cd8 100644 (file)
@@ -1311,6 +1311,43 @@ static ssize_t ll_nosquash_nids_seq_write(struct file *file,
 
 LDEBUGFS_SEQ_FOPS(ll_nosquash_nids);
 
+static int ll_pcc_seq_show(struct seq_file *m, void *v)
+{
+       struct super_block *sb = m->private;
+       struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+       return pcc_super_dump(&sbi->ll_pcc_super, m);
+}
+
+static ssize_t ll_pcc_seq_write(struct file *file, const char __user *buffer,
+                               size_t count, loff_t *off)
+{
+       struct seq_file *m = file->private_data;
+       struct super_block *sb = m->private;
+       struct ll_sb_info *sbi = ll_s2sbi(sb);
+       int rc;
+       char *kernbuf;
+
+       if (count >= LPROCFS_WR_PCC_MAX_CMD)
+               return -EINVAL;
+
+       if (!(exp_connect_flags2(sbi->ll_md_exp) & OBD_CONNECT2_PCC))
+               return -EOPNOTSUPP;
+
+       OBD_ALLOC(kernbuf, count + 1);
+       if (kernbuf == NULL)
+               return -ENOMEM;
+
+       if (copy_from_user(kernbuf, buffer, count))
+               GOTO(out_free_kernbuff, rc = -EFAULT);
+
+       rc = pcc_cmd_handle(kernbuf, count, &sbi->ll_pcc_super);
+out_free_kernbuff:
+       OBD_FREE(kernbuf, count + 1);
+       return rc ? rc : count;
+}
+LPROC_SEQ_FOPS(ll_pcc);
+
 struct lprocfs_vars lprocfs_llite_obd_vars[] = {
        { .name =       "site",
          .fops =       &ll_site_stats_fops                     },
@@ -1332,6 +1369,8 @@ struct lprocfs_vars lprocfs_llite_obd_vars[] = {
          .fops =       &ll_root_squash_fops                    },
        { .name =       "nosquash_nids",
          .fops =       &ll_nosquash_nids_fops                  },
+       { .name =       "pcc",
+         .fops =       &ll_pcc_fops,                           },
        { NULL }
 };
 
index 80bfd7b..afed81a 100644 (file)
@@ -729,14 +729,21 @@ out:
        return rc;
 }
 
+struct pcc_create_attach {
+       struct pcc_dataset *pca_dataset;
+       struct dentry *pca_dentry;
+};
+
 static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
                                   struct lookup_intent *it,
-                                  void **secctx, __u32 *secctxlen)
+                                  void **secctx, __u32 *secctxlen,
+                                  struct pcc_create_attach *pca)
 {
        struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
        struct dentry *save = dentry, *retval;
        struct ptlrpc_request *req = NULL;
        struct md_op_data *op_data = NULL;
+       struct lov_user_md *lum = NULL;
        __u32 opc;
        int rc;
        char secctx_name[XATTR_NAME_MAX + 1];
@@ -817,6 +824,36 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
                }
        }
 
+       if (pca && pca->pca_dataset) {
+               struct pcc_dataset *dataset = pca->pca_dataset;
+
+               OBD_ALLOC_PTR(lum);
+               if (lum == NULL)
+                       GOTO(out, retval = ERR_PTR(-ENOMEM));
+
+               lum->lmm_magic = LOV_USER_MAGIC_V1;
+               lum->lmm_pattern = LOV_PATTERN_F_RELEASED | LOV_PATTERN_RAID0;
+               lum->lmm_stripe_size = 0;
+               lum->lmm_stripe_count = 0;
+               lum->lmm_stripe_offset = 0;
+
+               op_data->op_data = lum;
+               op_data->op_data_size = sizeof(*lum);
+               op_data->op_archive_id = dataset->pccd_id;
+
+               rc = obd_fid_alloc(NULL, ll_i2mdexp(parent), &op_data->op_fid2,
+                                  op_data);
+               if (rc)
+                       GOTO(out, retval = ERR_PTR(rc));
+
+               rc = pcc_inode_create(dataset, &op_data->op_fid2,
+                                     &pca->pca_dentry);
+               if (rc)
+                       GOTO(out, retval = ERR_PTR(rc));
+
+               it->it_flags |= MDS_OPEN_PCC;
+       }
+
        rc = md_intent_lock(ll_i2mdexp(parent), op_data, it, &req,
                            &ll_md_blocking_ast, 0);
        /* If the MDS allows the client to chgrp (CFS_SETGRP_PERM), but the
@@ -878,6 +915,9 @@ out:
                ll_finish_md_op_data(op_data);
        }
 
+       if (lum != NULL)
+               OBD_FREE_PTR(lum);
+
        ptlrpc_req_finished(req);
        return retval;
 }
@@ -906,7 +946,7 @@ static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry,
                itp = NULL;
        else
                itp = &it;
-       de = ll_lookup_it(parent, dentry, itp, NULL, NULL);
+       de = ll_lookup_it(parent, dentry, itp, NULL, NULL, NULL);
 
        if (itp != NULL)
                ll_intent_release(itp);
@@ -927,6 +967,9 @@ static int ll_atomic_open(struct inode *dir, struct dentry *dentry,
        long long lookup_flags = LOOKUP_OPEN;
        void *secctx = NULL;
        __u32 secctxlen = 0;
+       struct ll_sb_info *sbi;
+       struct pcc_create_attach pca = {NULL, NULL};
+       struct pcc_dataset *dataset = NULL;
        int rc = 0;
        ENTRY;
 
@@ -961,13 +1004,22 @@ static int ll_atomic_open(struct inode *dir, struct dentry *dentry,
        if (open_flags & O_CREAT) {
                it->it_op |= IT_CREAT;
                lookup_flags |= LOOKUP_CREATE;
+               sbi = ll_i2sbi(dir);
+               /* Volatile file is used for HSM restore, so do not use PCC */
+               if (!filename_is_volatile(dentry->d_name.name,
+                                         dentry->d_name.len, NULL)) {
+                       dataset = pcc_dataset_get(&sbi->ll_pcc_super,
+                                                 ll_i2info(dir)->lli_projid,
+                                                 0);
+                       pca.pca_dataset = dataset;
+               }
        }
        it->it_create_mode = (mode & S_IALLUGO) | S_IFREG;
        it->it_flags = (open_flags & ~O_ACCMODE) | OPEN_FMODE(open_flags);
        it->it_flags &= ~MDS_OPEN_FL_INTERNAL;
 
        /* Dentry added to dcache tree in ll_lookup_it */
-       de = ll_lookup_it(dir, dentry, it, &secctx, &secctxlen);
+       de = ll_lookup_it(dir, dentry, it, &secctx, &secctxlen, &pca);
        if (IS_ERR(de))
                rc = PTR_ERR(de);
        else if (de != NULL)
@@ -986,9 +1038,20 @@ static int ll_atomic_open(struct inode *dir, struct dentry *dentry,
                                        dput(de);
                                goto out_release;
                        }
+                       if (dataset != NULL && dentry->d_inode) {
+                               rc = pcc_inode_create_fini(dataset,
+                                                          dentry->d_inode,
+                                                          pca.pca_dentry);
+                               if (rc) {
+                                       if (de != NULL)
+                                               dput(de);
+                                       GOTO(out_release, rc);
+                               }
+                       }
 
                        *opened |= FILE_CREATED;
                }
+
                if (dentry->d_inode && it_disposition(it, DISP_OPEN_OPEN)) {
                        /* Open dentry. */
                        if (S_ISFIFO(dentry->d_inode->i_mode)) {
@@ -1011,6 +1074,8 @@ static int ll_atomic_open(struct inode *dir, struct dentry *dentry,
        }
 
 out_release:
+       if (dataset != NULL)
+               pcc_dataset_put(dataset);
        ll_intent_release(it);
        OBD_FREE(it, sizeof(*it));
 
@@ -1075,7 +1140,7 @@ static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry,
                                RETURN((struct dentry *)it);
                }
 
-               de = ll_lookup_it(parent, dentry, it, NULL, NULL);
+               de = ll_lookup_it(parent, dentry, it, NULL, NULL, NULL);
                if (de)
                        dentry = de;
                if ((nd->flags & LOOKUP_OPEN) && !IS_ERR(dentry)) { /* Open */
@@ -1115,7 +1180,7 @@ static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry,
                        OBD_FREE(it, sizeof(*it));
                }
        } else {
-               de = ll_lookup_it(parent, dentry, NULL, NULL, NULL);
+               de = ll_lookup_it(parent, dentry, NULL, NULL, NULL, NULL);
        }
 
        RETURN(de);
diff --git a/lustre/llite/pcc.c b/lustre/llite/pcc.c
new file mode 100644 (file)
index 0000000..2de2715
--- /dev/null
@@ -0,0 +1,1091 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2017, DDN Storage Corporation.
+ */
+/*
+ * Persistent Client Cache
+ *
+ * PCC is a new framework which provides a group of local cache on Lustre
+ * client side. It works in two modes: RW-PCC enables a read-write cache on the
+ * local SSDs of a single client; RO-PCC provides a read-only cache on the
+ * local SSDs of multiple clients. Less overhead is visible to the applications
+ * and network latencies and lock conflicts can be significantly reduced.
+ *
+ * For RW-PCC, no global namespace will be provided. Each client uses its own
+ * local storage as a cache for itself. Local file system is used to manage
+ * the data on local caches. Cached I/O is directed to local file system while
+ * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
+ * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
+ * PCC has a copytool instance running with unique archive number. Any remote
+ * access from another Lustre client would trigger the data synchronization. If
+ * a client with RW-PCC goes offline, the cached data becomes inaccessible for
+ * other client temporarily. And after the RW-PCC client reboots and the
+ * copytool restarts, the data will be accessible again.
+ *
+ * Following is what will happen in different conditions for RW-PCC:
+ *
+ * > When file is being created on RW-PCC
+ *
+ * A normal HSM released file is created on MDT;
+ * An empty mirror file is created on local cache;
+ * The HSM status of the Lustre file will be set to archived and released;
+ * The archive number will be set to the proper value.
+ *
+ * > When file is being prefetched to RW-PCC
+ *
+ * An file is copied to the local cache;
+ * The HSM status of the Lustre file will be set to archived and released;
+ * The archive number will be set to the proper value.
+ *
+ * > When file is being accessed from PCC
+ *
+ * Data will be read directly from local cache;
+ * Metadata will be read from MDT, except file size;
+ * File size will be got from local cache.
+ *
+ * > When PCC cached file is being accessed on another client
+ *
+ * RW-PCC cached files are automatically restored when a process on another
+ * client tries to read or modify them. The corresponding I/O will block
+ * waiting for the released file to be restored. This is transparent to the
+ * process.
+ *
+ * For RW-PCC, when a file is being created, a rule-based policy is used to
+ * determine whether it will be cached. Rule-based caching of newly created
+ * files can determine which file can use a cache on PCC directly without any
+ * admission control.
+ *
+ * RW-PCC design can accelerate I/O intensive applications with one-to-one
+ * mappings between files and accessing clients. However, in several use cases,
+ * files will never be updated, but need to be read simultaneously from many
+ * clients. RO-PCC implements a read-only caching on Lustre clients using
+ * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
+ * that no HSM mechanism is used.
+ *
+ * The main advantages to use this SSD cache on the Lustre clients via PCC
+ * is that:
+ * - The I/O stack becomes much simpler for the cached data, as there is no
+ *   interference with I/Os from other clients, which enables easier
+ *   performance optimizations;
+ * - The requirements on the HW inside the client nodes are small, any kind of
+ *   SSDs or even HDDs can be used as cache devices;
+ * - Caching reduces the pressure on the object storage targets (OSTs), as
+ *   small or random I/Os can be regularized to big sequential I/Os and
+ *   temporary files do not even need to be flushed to OSTs.
+ *
+ * PCC can accelerate applications with certain I/O patterns:
+ * - small-sized random writes (< 1MB) from a single client
+ * - repeated read of data that is larger than RAM
+ * - clients with high network latency
+ *
+ * Author: Li Xi <lixi@ddn.com>
+ * Author: Qian Yingjin <qian@ddn.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_LLITE
+
+#include "pcc.h"
+#include <linux/namei.h>
+#include <linux/file.h>
+#include <lustre_compat.h>
+#include "llite_internal.h"
+
+struct kmem_cache *pcc_inode_slab;
+
+void pcc_super_init(struct pcc_super *super)
+{
+       spin_lock_init(&super->pccs_lock);
+       INIT_LIST_HEAD(&super->pccs_datasets);
+}
+
+/**
+ * pcc_dataset_add - Add a Cache policy to control which files need be
+ * cached and where it will be cached.
+ *
+ * @super: superblock of pcc
+ * @pathname: root path of pcc
+ * @id: HSM archive ID
+ * @projid: files with specified project ID will be cached.
+ */
+static int
+pcc_dataset_add(struct pcc_super *super, const char *pathname,
+               __u32 archive_id, __u32 projid)
+{
+       int rc;
+       struct pcc_dataset *dataset;
+       struct pcc_dataset *tmp;
+       bool found = false;
+
+       OBD_ALLOC_PTR(dataset);
+       if (dataset == NULL)
+               return -ENOMEM;
+
+       rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
+       if (unlikely(rc)) {
+               OBD_FREE_PTR(dataset);
+               return rc;
+       }
+       strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
+       dataset->pccd_id = archive_id;
+       dataset->pccd_projid = projid;
+       atomic_set(&dataset->pccd_refcount, 1);
+
+       spin_lock(&super->pccs_lock);
+       list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
+               if (tmp->pccd_id == archive_id) {
+                       found = true;
+                       break;
+               }
+       }
+       if (!found)
+               list_add(&dataset->pccd_linkage, &super->pccs_datasets);
+       spin_unlock(&super->pccs_lock);
+
+       if (found) {
+               pcc_dataset_put(dataset);
+               rc = -EEXIST;
+       }
+
+       return rc;
+}
+
+struct pcc_dataset *
+pcc_dataset_get(struct pcc_super *super, __u32 projid, __u32 archive_id)
+{
+       struct pcc_dataset *dataset;
+       struct pcc_dataset *selected = NULL;
+
+       if (projid == 0 && archive_id == 0)
+               return NULL;
+
+       /*
+        * archive ID is unique in the list, projid might be duplicate,
+        * we just return last added one as first priority.
+        */
+       spin_lock(&super->pccs_lock);
+       list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
+               if (projid && dataset->pccd_projid != projid)
+                       continue;
+               if (archive_id && dataset->pccd_id != archive_id)
+                       continue;
+               atomic_inc(&dataset->pccd_refcount);
+               selected = dataset;
+               break;
+       }
+       spin_unlock(&super->pccs_lock);
+       if (selected)
+               CDEBUG(D_CACHE, "matched projid %u, PCC create\n",
+                      selected->pccd_projid);
+       return selected;
+}
+
+void
+pcc_dataset_put(struct pcc_dataset *dataset)
+{
+       if (atomic_dec_and_test(&dataset->pccd_refcount)) {
+               path_put(&dataset->pccd_path);
+               OBD_FREE_PTR(dataset);
+       }
+}
+
+static int
+pcc_dataset_del(struct pcc_super *super, char *pathname)
+{
+       struct list_head *l, *tmp;
+       struct pcc_dataset *dataset;
+       int rc = -ENOENT;
+
+       spin_lock(&super->pccs_lock);
+       list_for_each_safe(l, tmp, &super->pccs_datasets) {
+               dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
+               if (strcmp(dataset->pccd_pathname, pathname) == 0) {
+                       list_del(&dataset->pccd_linkage);
+                       pcc_dataset_put(dataset);
+                       rc = 0;
+                       break;
+               }
+       }
+       spin_unlock(&super->pccs_lock);
+       return rc;
+}
+
+static void
+pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
+{
+       seq_printf(m, "%s:\n", dataset->pccd_pathname);
+       seq_printf(m, "  rwid: %u\n", dataset->pccd_id);
+       seq_printf(m, "  autocache: projid=%u\n", dataset->pccd_projid);
+}
+
+int
+pcc_super_dump(struct pcc_super *super, struct seq_file *m)
+{
+       struct pcc_dataset *dataset;
+
+       spin_lock(&super->pccs_lock);
+       list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
+               pcc_dataset_dump(dataset, m);
+       }
+       spin_unlock(&super->pccs_lock);
+       return 0;
+}
+
+void pcc_super_fini(struct pcc_super *super)
+{
+       struct pcc_dataset *dataset, *tmp;
+
+       list_for_each_entry_safe(dataset, tmp,
+                                &super->pccs_datasets, pccd_linkage) {
+               list_del(&dataset->pccd_linkage);
+               pcc_dataset_put(dataset);
+       }
+}
+
+
+static bool pathname_is_valid(const char *pathname)
+{
+       /* Needs to be absolute path */
+       if (pathname == NULL || strlen(pathname) == 0 ||
+           strlen(pathname) >= PATH_MAX || pathname[0] != '/')
+               return false;
+       return true;
+}
+
+static struct pcc_cmd *
+pcc_cmd_parse(char *buffer, unsigned long count)
+{
+       static struct pcc_cmd *cmd;
+       char *token;
+       char *val;
+       unsigned long tmp;
+       int rc = 0;
+
+       OBD_ALLOC_PTR(cmd);
+       if (cmd == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       /* clear all setting */
+       if (strncmp(buffer, "clear", 5) == 0) {
+               cmd->pccc_cmd = PCC_CLEAR_ALL;
+               GOTO(out, rc = 0);
+       }
+
+       val = buffer;
+       token = strsep(&val, " ");
+       if (val == NULL || strlen(val) == 0)
+               GOTO(out_free_cmd, rc = -EINVAL);
+
+       /* Type of the command */
+       if (strcmp(token, "add") == 0)
+               cmd->pccc_cmd = PCC_ADD_DATASET;
+       else if (strcmp(token, "del") == 0)
+               cmd->pccc_cmd = PCC_DEL_DATASET;
+       else
+               GOTO(out_free_cmd, rc = -EINVAL);
+
+       /* Pathname of the dataset */
+       token = strsep(&val, " ");
+       if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
+           !pathname_is_valid(token))
+               GOTO(out_free_cmd, rc = -EINVAL);
+       cmd->pccc_pathname = token;
+
+       if (cmd->pccc_cmd == PCC_ADD_DATASET) {
+               /* archive ID */
+               token = strsep(&val, " ");
+               if (val == NULL)
+                       GOTO(out_free_cmd, rc = -EINVAL);
+
+               rc = kstrtoul(token, 10, &tmp);
+               if (rc != 0)
+                       GOTO(out_free_cmd, rc = -EINVAL);
+               if (tmp == 0)
+                       GOTO(out_free_cmd, rc = -EINVAL);
+               cmd->u.pccc_add.pccc_id = tmp;
+
+               token = val;
+               rc = kstrtoul(token, 10, &tmp);
+               if (rc != 0)
+                       GOTO(out_free_cmd, rc = -EINVAL);
+               if (tmp == 0)
+                       GOTO(out_free_cmd, rc = -EINVAL);
+               cmd->u.pccc_add.pccc_projid = tmp;
+       }
+
+       goto out;
+out_free_cmd:
+       OBD_FREE_PTR(cmd);
+out:
+       if (rc)
+               cmd = ERR_PTR(rc);
+       return cmd;
+}
+
+int pcc_cmd_handle(char *buffer, unsigned long count,
+                  struct pcc_super *super)
+{
+       int rc = 0;
+       struct pcc_cmd *cmd;
+
+       cmd = pcc_cmd_parse(buffer, count);
+       if (IS_ERR(cmd))
+               return PTR_ERR(cmd);
+
+       switch (cmd->pccc_cmd) {
+       case PCC_ADD_DATASET:
+               rc = pcc_dataset_add(super, cmd->pccc_pathname,
+                                     cmd->u.pccc_add.pccc_id,
+                                     cmd->u.pccc_add.pccc_projid);
+               break;
+       case PCC_DEL_DATASET:
+               rc = pcc_dataset_del(super, cmd->pccc_pathname);
+               break;
+       case PCC_CLEAR_ALL:
+               pcc_super_fini(super);
+               break;
+       default:
+               rc = -EINVAL;
+               break;
+       }
+
+       OBD_FREE_PTR(cmd);
+       return rc;
+}
+
+static inline void pcc_inode_lock(struct inode *inode)
+{
+       mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
+}
+
+static inline void pcc_inode_unlock(struct inode *inode)
+{
+       mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
+}
+
+static void pcc_inode_init(struct pcc_inode *pcci)
+{
+       atomic_set(&pcci->pcci_refcount, 0);
+       pcci->pcci_type = LU_PCC_NONE;
+}
+
+static void pcc_inode_fini(struct pcc_inode *pcci)
+{
+       path_put(&pcci->pcci_path);
+       pcci->pcci_type = LU_PCC_NONE;
+       OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
+}
+
+static void pcc_inode_get(struct pcc_inode *pcci)
+{
+       atomic_inc(&pcci->pcci_refcount);
+}
+
+static void pcc_inode_put(struct pcc_inode *pcci)
+{
+       if (atomic_dec_and_test(&pcci->pcci_refcount))
+               pcc_inode_fini(pcci);
+}
+
+void pcc_inode_free(struct inode *inode)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct pcc_inode *pcci = lli->lli_pcc_inode;
+
+       if (pcci) {
+               WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
+               pcc_inode_put(pcci);
+               lli->lli_pcc_inode = NULL;
+       }
+}
+
+/*
+ * TODO:
+ * As Andreas suggested, we'd better use new layout to
+ * reduce overhead:
+ * (fid->f_oid >> 16 & oxFFFF)/FID
+ */
+#define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
+static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
+{
+       return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
+                       DFID_NOBRACE,
+                       (fid)->f_oid       & 0xFFFF,
+                       (fid)->f_oid >> 16 & 0xFFFF,
+                       (unsigned int)((fid)->f_seq       & 0xFFFF),
+                       (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
+                       (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
+                       (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
+                       PFID(fid));
+}
+
+void pcc_file_init(struct pcc_file *pccf)
+{
+       pccf->pccf_file = NULL;
+       pccf->pccf_type = LU_PCC_NONE;
+}
+
+int pcc_file_open(struct inode *inode, struct file *file)
+{
+       struct pcc_inode *pcci;
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct pcc_file *pccf = &fd->fd_pcc_file;
+       struct file *pcc_file;
+       struct path *path;
+       struct qstr *dname;
+       int rc = 0;
+
+       ENTRY;
+
+       if (!S_ISREG(inode->i_mode))
+               RETURN(0);
+
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       if (!pcci)
+               GOTO(out_unlock, rc = 0);
+
+       if (atomic_read(&pcci->pcci_refcount) == 0)
+               GOTO(out_unlock, rc = 0);
+
+       pcc_inode_get(pcci);
+       WARN_ON(pccf->pccf_file);
+
+       path = &pcci->pcci_path;
+       dname = &path->dentry->d_name;
+       CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
+              dname->name);
+#ifdef HAVE_DENTRY_OPEN_USE_PATH
+       pcc_file = dentry_open(path, file->f_flags, current_cred());
+#else
+       pcc_file = dentry_open(path->dentry, path->mnt,
+                              file->f_flags, current_cred());
+#endif
+       if (IS_ERR_OR_NULL(pcc_file)) {
+               rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
+               pcc_inode_put(pcci);
+       } else {
+               pccf->pccf_file = pcc_file;
+               pccf->pccf_type = pcci->pcci_type;
+       }
+
+out_unlock:
+       pcc_inode_unlock(inode);
+       RETURN(rc);
+}
+
+void pcc_file_release(struct inode *inode, struct file *file)
+{
+       struct pcc_inode *pcci;
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct pcc_file *pccf;
+       struct path *path;
+       struct qstr *dname;
+
+       ENTRY;
+
+       if (!S_ISREG(inode->i_mode) || fd == NULL)
+               RETURN_EXIT;
+
+       pccf = &fd->fd_pcc_file;
+       pcc_inode_lock(inode);
+       if (pccf->pccf_file == NULL)
+               goto out;
+
+       pcci = ll_i2pcci(inode);
+       LASSERT(pcci);
+       path = &pcci->pcci_path;
+       dname = &path->dentry->d_name;
+       CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len,
+              dname->name);
+       pcc_inode_put(pcci);
+       fput(pccf->pccf_file);
+       pccf->pccf_file = NULL;
+out:
+       pcc_inode_unlock(inode);
+}
+
+ssize_t pcc_file_read_iter(struct kiocb *iocb,
+                          struct iov_iter *iter, bool *cached)
+{
+       struct file *file = iocb->ki_filp;
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct pcc_file *pccf = &fd->fd_pcc_file;
+       ssize_t result;
+
+       ENTRY;
+
+       if (pccf->pccf_file == NULL) {
+               *cached = false;
+               RETURN(0);
+       }
+       *cached = true;
+       iocb->ki_filp = pccf->pccf_file;
+
+       result = generic_file_read_iter(iocb, iter);
+       iocb->ki_filp = file;
+
+       RETURN(result);
+}
+
+static ssize_t
+__pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
+{
+       struct file *file = iocb->ki_filp;
+
+#ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
+       return file->f_op->write_iter(iocb, iter);
+#else
+       struct iovec iov;
+       struct iov_iter i;
+       ssize_t bytes = 0;
+
+       iov_for_each(iov, i, *iter) {
+               ssize_t res;
+
+               res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
+               if (-EIOCBQUEUED == res)
+                       res = wait_on_sync_kiocb(iocb);
+               if (res <= 0) {
+                       if (bytes == 0)
+                               bytes = res;
+                       break;
+               }
+
+               bytes += res;
+               if (res < iov.iov_len)
+                       break;
+       }
+
+       if (bytes > 0)
+               iov_iter_advance(iter, bytes);
+       return bytes;
+#endif
+}
+
+ssize_t pcc_file_write_iter(struct kiocb *iocb,
+                           struct iov_iter *iter, bool *cached)
+{
+       struct file *file = iocb->ki_filp;
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct pcc_file *pccf = &fd->fd_pcc_file;
+       ssize_t result;
+
+       ENTRY;
+
+       if (pccf->pccf_file == NULL) {
+               *cached = false;
+               RETURN(0);
+       }
+       *cached = true;
+
+       if (pccf->pccf_type != LU_PCC_READWRITE)
+               RETURN(-EWOULDBLOCK);
+
+       iocb->ki_filp = pccf->pccf_file;
+
+       /* Since __pcc_file_write_iter makes write calls via
+        * the normal vfs interface to the local PCC file system,
+        * the inode lock is not needed.
+        */
+       result = __pcc_file_write_iter(iocb, iter);
+       iocb->ki_filp = file;
+       RETURN(result);
+}
+
+int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
+                     bool *cached)
+{
+       int rc;
+       struct pcc_inode *pcci;
+       struct iattr attr2 = *attr;
+       struct dentry *pcc_dentry;
+
+       ENTRY;
+
+       if (!S_ISREG(inode->i_mode)) {
+               *cached = false;
+               RETURN(0);
+       }
+
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0)
+               GOTO(out_unlock, rc = 0);
+
+       *cached = true;
+       attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
+                        ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
+                        ATTR_CTIME);
+       pcc_dentry = pcci->pcci_path.dentry;
+       inode_lock(pcc_dentry->d_inode);
+       rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
+       inode_unlock(pcc_dentry->d_inode);
+out_unlock:
+       pcc_inode_unlock(inode);
+       RETURN(rc);
+}
+
+int pcc_inode_getattr(struct inode *inode, bool *cached)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct pcc_inode *pcci;
+       struct kstat stat;
+       s64 atime;
+       s64 mtime;
+       s64 ctime;
+       int rc;
+
+       ENTRY;
+
+       if (!S_ISREG(inode->i_mode)) {
+               *cached = false;
+               RETURN(0);
+       }
+
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0)
+               GOTO(out_unlock, rc = 0);
+
+       *cached = true;
+       rc = ll_vfs_getattr(&pcci->pcci_path, &stat);
+       if (rc)
+               GOTO(out_unlock, rc);
+
+       ll_inode_size_lock(inode);
+       if (inode->i_atime.tv_sec < lli->lli_atime ||
+           lli->lli_update_atime) {
+               inode->i_atime.tv_sec = lli->lli_atime;
+               lli->lli_update_atime = 0;
+       }
+       inode->i_mtime.tv_sec = lli->lli_mtime;
+       inode->i_ctime.tv_sec = lli->lli_ctime;
+
+       atime = inode->i_atime.tv_sec;
+       mtime = inode->i_mtime.tv_sec;
+       ctime = inode->i_ctime.tv_sec;
+
+       if (atime < stat.atime.tv_sec)
+               atime = stat.atime.tv_sec;
+
+       if (ctime < stat.ctime.tv_sec)
+               ctime = stat.ctime.tv_sec;
+
+       if (mtime < stat.mtime.tv_sec)
+               mtime = stat.mtime.tv_sec;
+
+       i_size_write(inode, stat.size);
+       inode->i_blocks = stat.blocks;
+
+       inode->i_atime.tv_sec = atime;
+       inode->i_mtime.tv_sec = mtime;
+       inode->i_ctime.tv_sec = ctime;
+
+       ll_inode_size_unlock(inode);
+
+out_unlock:
+       pcc_inode_unlock(inode);
+       RETURN(rc);
+}
+
+/* Create directory under base if directory does not exist */
+static struct dentry *
+pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
+{
+       int rc;
+       struct dentry *dentry;
+       struct inode *dir = base->d_inode;
+
+       inode_lock(dir);
+       dentry = lookup_one_len(name, base, strlen(name));
+       if (IS_ERR(dentry))
+               goto out;
+
+       if (d_is_positive(dentry))
+               goto out;
+
+       rc = vfs_mkdir(dir, dentry, mode);
+       if (rc) {
+               dput(dentry);
+               dentry = ERR_PTR(rc);
+               goto out;
+       }
+out:
+       inode_unlock(dir);
+       return dentry;
+}
+
+static struct dentry *
+pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
+{
+       char *ptr, *entry_name;
+       struct dentry *parent;
+       struct dentry *child = ERR_PTR(-EINVAL);
+
+       ptr = path;
+       while (*ptr == '/')
+               ptr++;
+
+       entry_name = ptr;
+       parent = dget(root);
+       while ((ptr = strchr(ptr, '/')) != NULL) {
+               *ptr = '\0';
+               child = pcc_mkdir(parent, entry_name, mode);
+               *ptr = '/';
+               if (IS_ERR(child))
+                       break;
+               dput(parent);
+               parent = child;
+               ptr++;
+               entry_name = ptr;
+       }
+
+       return child;
+}
+
+/* Create file under base. If file already exist, return failure */
+static struct dentry *
+pcc_create(struct dentry *base, const char *name, umode_t mode)
+{
+       int rc;
+       struct dentry *dentry;
+       struct inode *dir = base->d_inode;
+
+       inode_lock(dir);
+       dentry = lookup_one_len(name, base, strlen(name));
+       if (IS_ERR(dentry))
+               goto out;
+
+       if (d_is_positive(dentry))
+               goto out;
+
+       rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE);
+       if (rc) {
+               dput(dentry);
+               dentry = ERR_PTR(rc);
+               goto out;
+       }
+out:
+       inode_unlock(dir);
+       return dentry;
+}
+
+/* Must be called with pcci->pcci_lock held */
+static void pcc_inode_attach_init(struct pcc_dataset *dataset,
+                                 struct pcc_inode *pcci,
+                                 struct dentry *dentry,
+                                 enum lu_pcc_type type)
+{
+       pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
+       pcci->pcci_path.dentry = dentry;
+       LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
+       atomic_set(&pcci->pcci_refcount, 1);
+       pcci->pcci_type = type;
+       pcci->pcci_attr_valid = false;
+}
+
+static int __pcc_inode_create(struct pcc_dataset *dataset,
+                             struct lu_fid *fid,
+                             struct dentry **dentry)
+{
+       char *path;
+       struct dentry *base;
+       struct dentry *child;
+       int rc = 0;
+
+       OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
+       if (path == NULL)
+               return -ENOMEM;
+
+       pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
+
+       base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0700);
+       if (IS_ERR(base)) {
+               rc = PTR_ERR(base);
+               GOTO(out, rc);
+       }
+
+       snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
+       child = pcc_create(base, path, 0600);
+       if (IS_ERR(child)) {
+               rc = PTR_ERR(child);
+               GOTO(out_base, rc);
+       }
+       *dentry = child;
+
+out_base:
+       dput(base);
+out:
+       OBD_FREE(path, MAX_PCC_DATABASE_PATH);
+       return rc;
+}
+
+int pcc_inode_create(struct pcc_dataset *dataset, struct lu_fid *fid,
+                    struct dentry **pcc_dentry)
+{
+       return __pcc_inode_create(dataset, fid, pcc_dentry);
+}
+
+int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
+                         struct dentry *pcc_dentry)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct pcc_inode *pcci;
+
+       ENTRY;
+
+       LASSERT(ll_i2pcci(inode) == NULL);
+       OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
+       if (pcci == NULL)
+               RETURN(-ENOMEM);
+
+       pcc_inode_init(pcci);
+       pcc_inode_lock(inode);
+       pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE);
+       lli->lli_pcc_inode = pcci;
+       pcc_inode_unlock(inode);
+
+       RETURN(0);
+}
+
+static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
+                         loff_t *offset)
+{
+       while (count > 0) {
+               ssize_t size;
+
+               size = vfs_write(filp, (const void __user *)buf, count, offset);
+               if (size < 0)
+                       return size;
+               count -= size;
+               buf += size;
+       }
+       return 0;
+}
+
+static int pcc_copy_data(struct file *src, struct file *dst)
+{
+       int rc = 0;
+       ssize_t rc2;
+       mm_segment_t oldfs;
+       loff_t pos, offset = 0;
+       size_t buf_len = 1048576;
+       void *buf;
+
+       ENTRY;
+
+       OBD_ALLOC_LARGE(buf, buf_len);
+       if (buf == NULL)
+               RETURN(-ENOMEM);
+
+       oldfs = get_fs();
+       set_fs(KERNEL_DS);
+       while (1) {
+               pos = offset;
+               rc2 = vfs_read(src, (void __user *)buf, buf_len, &pos);
+               if (rc2 < 0)
+                       GOTO(out_fs, rc = rc2);
+               else if (rc2 == 0)
+                       break;
+
+               pos = offset;
+               rc = pcc_filp_write(dst, buf, rc2, &pos);
+               if (rc < 0)
+                       GOTO(out_fs, rc);
+               offset += rc2;
+       }
+
+out_fs:
+       set_fs(oldfs);
+       OBD_FREE_LARGE(buf, buf_len);
+       RETURN(rc);
+}
+
+int pcc_readwrite_attach(struct file *file, struct inode *inode,
+                        __u32 archive_id)
+{
+       struct pcc_dataset *dataset;
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct pcc_inode *pcci;
+       struct dentry *dentry;
+       struct file *pcc_filp;
+       struct path path;
+       int rc;
+
+       ENTRY;
+
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       if (pcci == NULL) {
+               OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
+               if (pcci == NULL) {
+                       pcc_inode_unlock(inode);
+                       RETURN(-ENOMEM);
+               }
+
+               pcc_inode_init(pcci);
+       } else if (atomic_read(&pcci->pcci_refcount) > 0) {
+               pcc_inode_unlock(inode);
+               RETURN(-EEXIST);
+       }
+       pcc_inode_unlock(inode);
+
+       dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super, 0,
+                                 archive_id);
+       if (dataset == NULL)
+               GOTO(out_free_pcci, rc = -ENOENT);
+
+       rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
+       if (rc)
+               GOTO(out_dataset_put, rc);
+
+       path.mnt = dataset->pccd_path.mnt;
+       path.dentry = dentry;
+#ifdef HAVE_DENTRY_OPEN_USE_PATH
+       pcc_filp = dentry_open(&path, O_TRUNC | O_WRONLY | O_LARGEFILE,
+                              current_cred());
+#else
+       pcc_filp = dentry_open(path.dentry, path.mnt,
+                              O_TRUNC | O_WRONLY | O_LARGEFILE,
+                              current_cred());
+#endif
+       if (IS_ERR_OR_NULL(pcc_filp)) {
+               rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
+               GOTO(out_dentry, rc);
+       }
+
+       rc = pcc_copy_data(file, pcc_filp);
+       if (rc)
+               GOTO(out_fput, rc);
+
+       pcc_inode_lock(inode);
+       if (lli->lli_pcc_inode)
+               GOTO(out_unlock, rc = -EEXIST);
+       pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE);
+       lli->lli_pcc_inode = pcci;
+out_unlock:
+       pcc_inode_unlock(inode);
+out_fput:
+       fput(pcc_filp);
+out_dentry:
+       if (rc)
+               dput(dentry);
+out_dataset_put:
+       pcc_dataset_put(dataset);
+out_free_pcci:
+       if (rc)
+               OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
+       RETURN(rc);
+
+}
+
+int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
+                             bool lease_broken, int rc, bool attached)
+{
+       struct pcc_inode *pcci = ll_i2pcci(inode);
+
+       ENTRY;
+
+       if ((rc || lease_broken) && attached && pcci)
+               pcc_inode_put(pcci);
+
+       RETURN(rc);
+}
+
+int pcc_ioctl_detach(struct inode *inode)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct pcc_inode *pcci = lli->lli_pcc_inode;
+       int rc = 0;
+       int count;
+
+       ENTRY;
+
+       pcc_inode_lock(inode);
+       if (pcci == NULL)
+               GOTO(out_unlock, rc = 0);
+
+       count = atomic_read(&pcci->pcci_refcount);
+       if (count > 1)
+               GOTO(out_unlock, rc = -EBUSY);
+       else if (count == 0)
+               GOTO(out_unlock, rc = 0);
+
+       pcc_inode_put(pcci);
+       lli->lli_pcc_inode = NULL;
+out_unlock:
+       pcc_inode_unlock(inode);
+
+       RETURN(rc);
+}
+
+int pcc_ioctl_state(struct inode *inode, struct lu_pcc_state *state)
+{
+       int rc = 0;
+       int count;
+       char *buf;
+       char *path;
+       int buf_len = sizeof(state->pccs_path);
+       struct pcc_inode *pcci;
+
+       ENTRY;
+
+       if (buf_len <= 0)
+               RETURN(-EINVAL);
+
+       OBD_ALLOC(buf, buf_len);
+       if (buf == NULL)
+               RETURN(-ENOMEM);
+
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       if (pcci == NULL) {
+               state->pccs_type = LU_PCC_NONE;
+               GOTO(out_unlock, rc = 0);
+       }
+
+       count = atomic_read(&pcci->pcci_refcount);
+       if (count == 0) {
+               state->pccs_type = LU_PCC_NONE;
+               GOTO(out_unlock, rc = 0);
+       }
+       state->pccs_type = pcci->pcci_type;
+       state->pccs_open_count = count - 1;
+       state->pccs_flags = pcci->pcci_attr_valid ?
+                           PCC_STATE_FLAG_ATTR_VALID : 0;
+#ifdef HAVE_DENTRY_PATH_RAW
+       path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
+       if (IS_ERR(path))
+               GOTO(out_unlock, rc = PTR_ERR(path));
+#else
+       path = "UNKNOWN";
+#endif
+
+       if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
+               GOTO(out_unlock, rc = -ENAMETOOLONG);
+
+out_unlock:
+       pcc_inode_unlock(inode);
+       OBD_FREE(buf, buf_len);
+       RETURN(rc);
+}
diff --git a/lustre/llite/pcc.h b/lustre/llite/pcc.h
new file mode 100644 (file)
index 0000000..7d3e8b4
--- /dev/null
@@ -0,0 +1,128 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2017, DDN Storage Corporation.
+ */
+/*
+ *
+ * Persistent Client Cache
+ *
+ * Author: Li Xi <lixi@ddn.com>
+ */
+
+#ifndef LLITE_PCC_H
+#define LLITE_PCC_H
+
+#include <linux/types.h>
+#include <linux/fs.h>
+#include <linux/seq_file.h>
+#include <uapi/linux/lustre/lustre_user.h>
+
+extern struct kmem_cache *pcc_inode_slab;
+
+#define LPROCFS_WR_PCC_MAX_CMD 4096
+
+struct pcc_dataset {
+       __u32                   pccd_id;         /* Archive ID */
+       __u32                   pccd_projid;     /* Project ID */
+       char                    pccd_pathname[PATH_MAX]; /* full path */
+       struct path             pccd_path;       /* Root path */
+       struct list_head        pccd_linkage;  /* Linked to pccs_datasets */
+       atomic_t                pccd_refcount; /* reference count */
+};
+
+struct pcc_super {
+       spinlock_t              pccs_lock;      /* Protect pccs_datasets */
+       struct list_head        pccs_datasets;  /* List of datasets */
+};
+
+struct pcc_inode {
+       /* Cache path on local file system */
+       struct path                      pcci_path;
+       /*
+        * If reference count is 0, then the cache is not inited, if 1, then
+        * no one is using it.
+        */
+       atomic_t                         pcci_refcount;
+       /* Whether readonly or readwrite PCC */
+       enum lu_pcc_type                 pcci_type;
+       /* Whether the inode is cached locally */
+       bool                             pcci_attr_valid;
+};
+
+struct pcc_file {
+       /* Opened cache file */
+       struct file             *pccf_file;
+       /* Whether readonly or readwrite PCC */
+       enum lu_pcc_type         pccf_type;
+};
+
+enum pcc_cmd_type {
+       PCC_ADD_DATASET = 0,
+       PCC_DEL_DATASET,
+       PCC_CLEAR_ALL,
+};
+
+struct pcc_cmd {
+       enum pcc_cmd_type                        pccc_cmd;
+       char                                    *pccc_pathname;
+       union {
+               struct pcc_cmd_add {
+                       __u32                    pccc_id;
+                       __u32                    pccc_projid;
+               } pccc_add;
+               struct pcc_cmd_del {
+                       __u32                    pccc_pad;
+               } pccc_del;
+       } u;
+};
+
+void pcc_super_init(struct pcc_super *super);
+void pcc_super_fini(struct pcc_super *super);
+int pcc_cmd_handle(char *buffer, unsigned long count,
+                  struct pcc_super *super);
+int
+pcc_super_dump(struct pcc_super *super, struct seq_file *m);
+int pcc_readwrite_attach(struct file *file,
+                        struct inode *inode, __u32 arch_id);
+int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
+                             bool lease_broken, int rc, bool attached);
+int pcc_ioctl_detach(struct inode *inode);
+int pcc_ioctl_state(struct inode *inode, struct lu_pcc_state *state);
+void pcc_file_init(struct pcc_file *pccf);
+int pcc_file_open(struct inode *inode, struct file *file);
+void pcc_file_release(struct inode *inode, struct file *file);
+ssize_t pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter,
+                          bool *cached);
+ssize_t pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter,
+                           bool *cached);
+int pcc_inode_getattr(struct inode *inode, bool *cached);
+int pcc_inode_setattr(struct inode *inode, struct iattr *attr, bool *cached);
+int pcc_inode_create(struct pcc_dataset *dataset, struct lu_fid *fid,
+                    struct dentry **pcc_dentry);
+int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
+                         struct dentry *pcc_dentry);
+struct pcc_dataset *
+pcc_dataset_get(struct pcc_super *super, __u32 projid, __u32 archive_id);
+void pcc_dataset_put(struct pcc_dataset *dataset);
+void pcc_inode_free(struct inode *inode);
+#endif /* LLITE_PCC_H */
index bd38776..a6fef2e 100644 (file)
@@ -121,6 +121,12 @@ static int __init lustre_init(void)
        if (ll_file_data_slab == NULL)
                GOTO(out_cache, rc = -ENOMEM);
 
+       pcc_inode_slab = kmem_cache_create("ll_pcc_inode",
+                                          sizeof(struct pcc_inode), 0,
+                                          SLAB_HWCACHE_ALIGN, NULL);
+       if (pcc_inode_slab == NULL)
+               GOTO(out_cache, rc = -ENOMEM);
+
        rc = llite_tunables_register();
        if (rc)
                GOTO(out_cache, rc);
@@ -163,6 +169,7 @@ out_tunables:
 out_cache:
        kmem_cache_destroy(ll_inode_cachep);
        kmem_cache_destroy(ll_file_data_slab);
+       kmem_cache_destroy(pcc_inode_slab);
        return rc;
 }
 
@@ -179,6 +186,7 @@ static void __exit lustre_exit(void)
 
        kmem_cache_destroy(ll_inode_cachep);
        kmem_cache_destroy(ll_file_data_slab);
+       kmem_cache_destroy(pcc_inode_slab);
 }
 
 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
index c98a852..7abc814 100644 (file)
@@ -124,11 +124,7 @@ static int ll_xattr_set_common(const struct xattr_handler *handler,
 
        if ((handler->flags == XATTR_ACL_ACCESS_T ||
             handler->flags == XATTR_ACL_DEFAULT_T) &&
-#ifdef HAVE_INODE_OWNER_OR_CAPABLE
            !inode_owner_or_capable(inode))
-#else
-           !is_owner_or_cap(inode))
-#endif
                RETURN(-EPERM);
 
        /* b10667: ignore lustre special xattr for now */
index 152eb7f..d0cd4a1 100644 (file)
@@ -124,11 +124,7 @@ int ll_setxattr_common(struct inode *inode, const char *name,
 
        if ((xattr_type == XATTR_ACL_ACCESS_T ||
             xattr_type == XATTR_ACL_DEFAULT_T) &&
-#ifdef HAVE_INODE_OWNER_OR_CAPABLE
            !inode_owner_or_capable(inode))
-#else
-           !is_owner_or_cap(inode))
-#endif
                return -EPERM;
 
        /* b10667: ignore lustre special xattr for now */
index 75be491..50ac0de 100644 (file)
@@ -355,7 +355,8 @@ retry:
                op_data->op_mds = tgt->ltd_idx;
        } else {
                LASSERT(fid_is_sane(&op_data->op_fid1));
-               LASSERT(fid_is_zero(&op_data->op_fid2));
+               LASSERT(it->it_flags & MDS_OPEN_PCC ||
+                       fid_is_zero(&op_data->op_fid2));
                LASSERT(op_data->op_name != NULL);
 
                tgt = lmv_locate_tgt(lmv, op_data, &op_data->op_fid1);
@@ -365,7 +366,8 @@ retry:
 
        /* If it is ready to open the file by FID, do not need
         * allocate FID at all, otherwise it will confuse MDT */
-       if ((it->it_op & IT_CREAT) && !(it->it_flags & MDS_OPEN_BY_FID)) {
+       if ((it->it_op & IT_CREAT) && !(it->it_flags & MDS_OPEN_BY_FID ||
+                                       it->it_flags & MDS_OPEN_PCC)) {
                /*
                 * For lookup(IT_CREATE) cases allocate new fid and setup FLD
                 * for it.
index 8278969..5dc6f1f 100644 (file)
@@ -3462,6 +3462,7 @@ struct obd_ops lmv_obd_ops = {
         .o_set_info_async       = lmv_set_info_async,
         .o_notify               = lmv_notify,
         .o_get_uuid             = lmv_get_uuid,
+       .o_fid_alloc            = lmv_fid_alloc,
         .o_iocontrol            = lmv_iocontrol,
         .o_quotactl             = lmv_quotactl
 };
index cd6f2ca..efa5469 100644 (file)
@@ -304,6 +304,10 @@ void mdc_open_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
                cr_flags |= MDS_OPEN_HAS_EA;
                tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
                memcpy(tmp, lmm, lmmlen);
+               if (cr_flags & MDS_OPEN_PCC) {
+                       LASSERT(op_data != NULL);
+                       rec->cr_archive_id = op_data->op_archive_id;
+               }
        }
        set_mrc_cr_flags(rec, cr_flags);
 }
@@ -514,6 +518,8 @@ static void mdc_close_intent_pack(struct ptlrpc_request *req,
                        memcpy(req_capsule_client_get(&req->rq_pill, &RMF_U32),
                                op_data->op_data, count * sizeof(__u32));
                }
+       } else if (bias & MDS_PCC_ATTACH) {
+               data->cd_archive_id = op_data->op_archive_id;
        }
 }
 
index 551c370..3a476a7 100644 (file)
@@ -2109,6 +2109,7 @@ static int mdd_declare_create_object(const struct lu_env *env,
                                     const struct md_op_spec *spec,
                                     struct lu_buf *def_acl_buf,
                                     struct lu_buf *acl_buf,
+                                    struct lu_buf *hsm_buf,
                                     struct dt_allocation_hint *hint)
 {
        const struct lu_buf *buf;
@@ -2155,6 +2156,14 @@ static int mdd_declare_create_object(const struct lu_env *env,
                                           0, handle);
                if (rc)
                        GOTO(out, rc);
+
+               if (spec->sp_cr_flags & MDS_OPEN_PCC) {
+                       rc = mdo_declare_xattr_set(env, c, hsm_buf,
+                                                  XATTR_NAME_HSM,
+                                                  0, handle);
+                       if (rc)
+                               GOTO(out, rc);
+               }
        }
 
        if (S_ISLNK(attr->la_mode)) {
@@ -2191,12 +2200,13 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                              struct linkea_data *ldata,
                              struct lu_buf *def_acl_buf,
                              struct lu_buf *acl_buf,
+                             struct lu_buf *hsm_buf,
                              struct dt_allocation_hint *hint)
 {
        int rc;
 
        rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
-                                      def_acl_buf, acl_buf, hint);
+                                      def_acl_buf, acl_buf, hsm_buf, hint);
        if (rc)
                GOTO(out, rc);
 
@@ -2291,6 +2301,7 @@ static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
                             struct mdd_object *son, struct lu_attr *attr,
                             struct md_op_spec *spec, struct lu_buf *acl_buf,
                             struct lu_buf *def_acl_buf,
+                            struct lu_buf *hsm_buf,
                             struct dt_allocation_hint *hint,
                             struct thandle *handle)
 {
@@ -2339,6 +2350,19 @@ static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
                        GOTO(err_destroy, rc);
        }
 
+       if (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_PCC) {
+               struct md_hsm mh;
+
+               memset(&mh, 0, sizeof(mh));
+               mh.mh_flags = HS_EXISTS | HS_ARCHIVED | HS_RELEASED;
+               mh.mh_arch_id = spec->sp_archive_id;
+               lustre_hsm2buf(hsm_buf->lb_buf, &mh);
+               rc = mdo_xattr_set(env, son, hsm_buf, XATTR_NAME_HSM,
+                                  0, handle);
+               if (rc != 0)
+                       GOTO(err_destroy, rc);
+       }
+
 #ifdef CONFIG_FS_POSIX_ACL
        if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
            S_ISDIR(attr->la_mode)) {
@@ -2501,6 +2525,7 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
        struct lu_attr          *pattr = &info->mti_pattr;
        struct lu_buf           acl_buf;
        struct lu_buf           def_acl_buf;
+       struct lu_buf           hsm_buf;
        struct linkea_data      *ldata = &info->mti_link_data;
        const char              *name = lname->ln_name;
        struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
@@ -2562,9 +2587,18 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
                                        lname, 1, 0, ldata);
        }
 
+       if (spec->sp_cr_flags & MDS_OPEN_PCC) {
+               LASSERT(spec->sp_cr_flags & MDS_OPEN_HAS_EA);
+
+               memset(&hsm_buf, 0, sizeof(hsm_buf));
+               lu_buf_alloc(&hsm_buf, sizeof(struct hsm_attrs));
+               if (hsm_buf.lb_buf == NULL)
+                       GOTO(out_stop, rc = -ENOMEM);
+       }
+
        rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
                                handle, spec, ldata, &def_acl_buf, &acl_buf,
-                               hint);
+                               &hsm_buf, hint);
        if (rc)
                GOTO(out_stop, rc);
 
@@ -2573,7 +2607,7 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
                GOTO(out_stop, rc);
 
        rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
-                              &def_acl_buf, hint, handle);
+                              &def_acl_buf, &hsm_buf, hint, handle);
        if (rc != 0)
                GOTO(out_stop, rc);
 
@@ -2664,6 +2698,9 @@ out_free:
                /* if we vmalloced a large buffer drop it */
                lu_buf_free(ldata->ld_buf);
 
+       if (spec->sp_cr_flags & MDS_OPEN_PCC)
+               lu_buf_free(&hsm_buf);
+
        /* The child object shouldn't be cached anymore */
        if (rc)
                set_bit(LU_OBJECT_HEARD_BANSHEE,
@@ -3905,7 +3942,7 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
 
        rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
                                lname, attr, handle, spec, ldata, NULL, NULL,
-                               hint);
+                               NULL, hint);
        if (rc)
                return rc;
 
@@ -4064,8 +4101,8 @@ static int mdd_migrate_create(const struct lu_env *env,
        /* don't set nlink from sobj */
        attr->la_valid &= ~LA_NLINK;
 
-       rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, hint,
-                               handle);
+       rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, NULL,
+                              hint, handle);
        if (rc)
                RETURN(rc);
 
index a7d5130..020ff59 100644 (file)
@@ -1131,7 +1131,8 @@ static int mdt_setattr_unpack_rec(struct mdt_thread_info *info)
        ma->ma_valid = MA_INODE;
 
        ma->ma_attr_flags |= rec->sa_bias & (MDS_CLOSE_INTENT |
-                               MDS_DATA_MODIFIED | MDS_TRUNC_KEEP_LEASE);
+                               MDS_DATA_MODIFIED | MDS_TRUNC_KEEP_LEASE |
+                               MDS_PCC_ATTACH);
        RETURN(0);
 }
 
@@ -1619,6 +1620,7 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
                                                                &RMF_EADATA);
                         sp->u.sp_ea.eadatalen = rr->rr_eadatalen;
                         sp->u.sp_ea.eadata = rr->rr_eadata;
+                       sp->sp_archive_id = rec->cr_archive_id;
                         sp->no_create = !!req_is_replay(req);
                        mdt_fix_lov_magic(info, rr->rr_eadata);
                 }
index 43d891c..bd20a5f 100644 (file)
@@ -1805,19 +1805,63 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        if (rc != 0)
                GOTO(out_unlock, rc);
 
-       if (!mdt_hsm_release_allow(ma))
-               GOTO(out_unlock, rc = -EPERM);
-
-       /* already released? */
-       if (ma->ma_hsm.mh_flags & HS_RELEASED)
-               GOTO(out_unlock, rc = 0);
-
-       /* Compare on-disk and packed data_version */
-       if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) {
-               CDEBUG(D_HSM, DFID" data_version mismatches: packed=%llu"
-                      " and on-disk=%llu\n", PFID(mdt_object_fid(o)),
-                      data->cd_data_version, ma->ma_hsm.mh_arch_ver);
-               GOTO(out_unlock, rc = -EPERM);
+       if (ma->ma_attr_flags & MDS_PCC_ATTACH) {
+               if (ma->ma_valid & MA_HSM) {
+                       if (ma->ma_hsm.mh_flags & HS_RELEASED)
+                               GOTO(out_unlock, rc = -EALREADY);
+
+                       if (ma->ma_hsm.mh_arch_id != data->cd_archive_id)
+                               CDEBUG(D_CACHE,
+                                      DFID" archive id diff: %llu:%u\n",
+                                      PFID(mdt_object_fid(o)),
+                                      ma->ma_hsm.mh_arch_id,
+                                      data->cd_archive_id);
+
+                       if (!(ma->ma_hsm.mh_flags & HS_DIRTY) &&
+                           ma->ma_hsm.mh_arch_ver == data->cd_data_version) {
+                               CDEBUG(D_CACHE,
+                                      DFID" data version matches: packed=%llu "
+                                      "and on-disk=%llu\n",
+                                      PFID(mdt_object_fid(o)),
+                                      data->cd_data_version,
+                                      ma->ma_hsm.mh_arch_ver);
+                               ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS;
+                       }
+               } else {
+                       /* Set up HSM attribte for PCC archived object */
+                       CLASSERT(sizeof(struct hsm_attrs) <=
+                                sizeof(info->mti_xattr_buf));
+                       buf = &info->mti_buf;
+                       buf->lb_buf = info->mti_xattr_buf;
+                       buf->lb_len = sizeof(struct hsm_attrs);
+                       memset(&ma->ma_hsm, 0, sizeof(ma->ma_hsm));
+                       ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS;
+                       ma->ma_hsm.mh_arch_id = data->cd_archive_id;
+                       ma->ma_hsm.mh_arch_ver = data->cd_data_version;
+                       lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
+
+                       rc = mo_xattr_set(info->mti_env, mdt_object_child(o),
+                                         buf, XATTR_NAME_HSM, 0);
+                       if (rc)
+                               GOTO(out_unlock, rc);
+               }
+       } else {
+               if (!mdt_hsm_release_allow(ma))
+                       GOTO(out_unlock, rc = -EPERM);
+
+               /* already released? */
+               if (ma->ma_hsm.mh_flags & HS_RELEASED)
+                       GOTO(out_unlock, rc = 0);
+
+               /* Compare on-disk and packed data_version */
+               if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) {
+                       CDEBUG(D_HSM, DFID" data_version mismatches: "
+                              "packed=%llu and on-disk=%llu\n",
+                              PFID(mdt_object_fid(o)),
+                              data->cd_data_version,
+                              ma->ma_hsm.mh_arch_ver);
+                       GOTO(out_unlock, rc = -EPERM);
+               }
        }
 
        ma->ma_valid = MA_INODE;
index 68fd6ae..f65c116 100644 (file)
@@ -39,7 +39,7 @@ noinst_SCRIPTS += posix.sh sanity-scrub.sh scrub-performance.sh ha.sh
 noinst_SCRIPTS += sanity-lfsck.sh lfsck-performance.sh
 noinst_SCRIPTS += resolveip
 noinst_SCRIPTS += sanity-hsm.sh sanity-lsnapshot.sh sanity-pfl.sh sanity-flr.sh
-noinst_SCRIPTS += sanity-dom.sh dom-performance.sh
+noinst_SCRIPTS += sanity-dom.sh sanity-pcc.sh dom-performance.sh
 nobase_noinst_SCRIPTS = cfg/local.sh
 nobase_noinst_SCRIPTS += test-groups/regression test-groups/regression-mpi
 nobase_noinst_SCRIPTS += acl/make-tree acl/run cfg/ncli.sh
index 46a96d6..b759e49 100755 (executable)
@@ -42,60 +42,6 @@ CLIENT2=${CLIENT2:-$CLIENT1}
 
 is_mounted $MOUNT2 || error "MOUNT2 is not mounted"
 
-rmultiop_start() {
-       local client=$1
-       local file=$2
-       local cmds=$3
-       local WAIT_MAX=${4:-60}
-       local wait_time=0
-
-       # We need to run do_node in bg, because pdsh does not exit
-       # if child process of run script exists.
-       # I.e. pdsh does not exit when runmultiop_bg_pause exited,
-       # because of multiop_bg_pause -> $MULTIOP_PROG &
-       # By the same reason we need sleep a bit after do_nodes starts
-       # to let runmultiop_bg_pause start muliop and
-       # update /tmp/multiop_bg.pid ;
-       # The rm /tmp/multiop_bg.pid guarantees here that
-       # we have the updated by runmultiop_bg_pause
-       # /tmp/multiop_bg.pid file
-
-       local pid_file=$TMP/multiop_bg.pid.$$
-       do_node $client "MULTIOP_PID_FILE=$pid_file LUSTRE= \
-                       runmultiop_bg_pause $file $cmds" &
-       local pid=$!
-       local multiop_pid
-
-       while [[ $wait_time -lt $WAIT_MAX ]]; do
-               sleep 3
-               wait_time=$((wait_time + 3))
-               multiop_pid=$(do_node $client cat $pid_file)
-               if [ -n "$multiop_pid" ]; then
-                       break
-               fi
-       done
-
-       [ -n "$multiop_pid" ] ||
-               error "$client : Can not get multiop_pid from $pid_file "
-
-       eval export $(node_var_name $client)_multiop_pid=$multiop_pid
-       eval export $(node_var_name $client)_do_node_pid=$pid
-       local var=$(node_var_name $client)_multiop_pid
-       echo client $client multiop_bg started multiop_pid=${!var}
-       return $?
-}
-
-rmultiop_stop() {
-    local client=$1
-    local multiop_pid=$(node_var_name $client)_multiop_pid
-    local do_node_pid=$(node_var_name $client)_do_node_pid
-
-    echo "Stopping multiop_pid=${!multiop_pid} (kill ${!multiop_pid} on $client)"
-    do_node $client kill -USR1 ${!multiop_pid}
-
-    wait ${!do_node_pid}
-}
-
 #
 # get_version(): Gets the version of an object on servers
 # Parameter1: Client/Machine Name
index 97a5129..7e5a0b9 100755 (executable)
@@ -226,27 +226,6 @@ copytool_monitor_setup() {
        fi
 }
 
-copytool_monitor_cleanup() {
-       local facet=${1:-$SINGLEAGT}
-       local agent=$(facet_active_host $facet)
-
-       if [ -n "$HSMTOOL_MONITOR_DIR" ]; then
-               # Should die when the copytool dies, but just in case.
-               local cmd="kill \\\$(cat $HSMTOOL_MONITOR_DIR/monitor_pid)"
-               cmd+=" 2>/dev/null || true"
-               do_node $agent "$cmd"
-               do_node $agent "rm -fr $HSMTOOL_MONITOR_DIR"
-               export HSMTOOL_MONITOR_DIR=
-       fi
-
-       # The pdsh should die on its own when the monitor dies. Just
-       # in case, though, try to clean up to avoid any cruft.
-       if [ -n "$HSMTOOL_MONITOR_PDSH" ]; then
-               kill $HSMTOOL_MONITOR_PDSH 2>/dev/null || true
-               export HSMTOOL_MONITOR_PDSH=
-       fi
-}
-
 fid2archive()
 {
        local fid="$1"
@@ -258,134 +237,6 @@ fid2archive()
        esac
 }
 
-copytool_logfile()
-{
-       local host="$(facet_host "$1")"
-       local prefix=$TESTLOG_PREFIX
-       [ -n "$TESTNAME" ] && prefix+=.$TESTNAME
-
-       printf "${prefix}.copytool${archive_id}_log.${host}.log"
-}
-
-__lhsmtool_rebind()
-{
-       do_facet $facet $HSMTOOL -p "$hsm_root" --rebind "$@" "$mountpoint"
-}
-
-__lhsmtool_import()
-{
-       mkdir -p "$(dirname "$2")" ||
-               error "cannot create directory '$(dirname "$2")'"
-       do_facet $facet $HSMTOOL -p "$hsm_root" --import "$@" "$mountpoint"
-}
-
-__lhsmtool_setup()
-{
-       local cmd="$HSMTOOL $HSMTOOL_VERBOSE --daemon --hsm-root \"$hsm_root\""
-       [ -n "$bandwidth" ] && cmd+=" --bandwidth $bandwidth"
-       [ -n "$archive_id" ] && cmd+=" --archive $archive_id"
-       [ ${#misc_options[@]} -gt 0 ] &&
-               cmd+=" $(IFS=" " echo "$@")"
-       cmd+=" \"$mountpoint\""
-
-       echo "Starting copytool $facet on $(facet_host $facet)"
-       stack_trap "do_facet $facet libtool execute pkill -x '$HSMTOOL' || true" EXIT
-       do_facet $facet "$cmd < /dev/null > \"$(copytool_logfile $facet)\" 2>&1"
-}
-
-hsm_root() {
-       local facet="${1:-$SINGLEAGT}"
-
-       printf "$(copytool_device "$facet")/${TESTSUITE}.${TESTNAME}/"
-}
-
-# Main entry point to perform copytool related operations
-#
-# Sub-commands:
-#
-#      setup   setup a copytool to run in the background, that copytool will be
-#              killed on EXIT
-#      import  import a file from an HSM backend
-#      rebind  rebind an archived file to a new fid
-#
-# Although the semantics might suggest otherwise, one does not need to 'setup'
-# a copytool before a call to 'copytool import' or 'copytool rebind'.
-#
-copytool()
-{
-       local action=$1
-       shift
-
-       # Parse arguments
-       local fail_on_error=true
-       local -a misc_options
-       while [ $# -gt 0 ]; do
-               case "$1" in
-               -f|--facet)
-                       shift
-                       local facet="$1"
-                       ;;
-               -m|--mountpoint)
-                       shift
-                       local mountpoint="$1"
-                       ;;
-               -a|--archive-id)
-                       shift
-                       local archive_id="$1"
-                       ;;
-               -b|--bwlimit)
-                       shift
-                       local bandwidth="$1" # in MB/s
-                       ;;
-               -n|--no-fail)
-                       local fail_on_error=false
-                       ;;
-               *)
-                       # Uncommon(/copytool dependent) option
-                       misc_options+=("$1")
-                       ;;
-               esac
-               shift
-       done
-
-       # Use default values if needed
-       local facet=${facet:-$SINGLEAGT}
-       local mountpoint="${mountpoint:-${MOUNT2:-$MOUNT}}"
-       local hsm_root="$(hsm_root "$facet")"
-
-       stack_trap "do_facet $facet rm -rf '$hsm_root'" EXIT
-       do_facet $facet mkdir -p "$hsm_root" ||
-               error "mkdir '$hsm_root' failed"
-
-       case "$HSMTOOL" in
-       lhsmtool_posix)
-               local copytool=lhsmtool
-               ;;
-       esac
-
-       __${copytool}_${action} "${misc_options[@]}"
-       if [ $? -ne 0 ]; then
-               local error_msg
-
-               case $action in
-               setup)
-                       local host="$(facet_host $facet)"
-                       error_msg="Failed to start copytool $facet on '$host'"
-                       ;;
-               import)
-                       local src="${misc_options[0]}"
-                       local dest="${misc_options[1]}"
-                       error_msg="Failed to import '$src' to '$dest'"
-                       ;;
-               rebind)
-                       error_msg="could not rebind file"
-                       ;;
-               esac
-
-               $fail_on_error && error "$error_msg" || echo "$error_msg"
-       fi
-}
-
 get_copytool_event_log() {
        local facet=${1:-$SINGLEAGT}
        local agent=$(facet_active_host $facet)
@@ -508,55 +359,12 @@ copy2archive() {
                error "cannot copy '$1' to '$file'"
 }
 
-mdts_set_param() {
-       local arg=$1
-       local key=$2
-       local value=$3
-       local mdtno
-       local rc=0
-       if [[ "$value" != "" ]]; then
-               value="=$value"
-       fi
-       for mdtno in $(seq 1 $MDSCOUNT); do
-               local idx=$(($mdtno - 1))
-               local facet=mds${mdtno}
-               # if $arg include -P option, run 1 set_param per MDT on the MGS
-               # else, run set_param on each MDT
-               [[ $arg = *"-P"* ]] && facet=mgs
-               do_facet $facet $LCTL set_param $arg mdt.${MDT[$idx]}.$key$value
-               [[ $? != 0 ]] && rc=1
-       done
-       return $rc
-}
-
-mdts_check_param() {
-       local key="$1"
-       local target="$2"
-       local timeout="$3"
-       local mdtno
-       for mdtno in $(seq 1 $MDSCOUNT); do
-               local idx=$(($mdtno - 1))
-               wait_result mds${mdtno} \
-                       "$LCTL get_param -n $MDT_PREFIX${idx}.$key" "$target" \
-                       $timeout ||
-                       error "$key state is not '$target' on mds${mdtno}"
-       done
-}
-
 get_hsm_param() {
        local param=$1
        local val=$(do_facet $SINGLEMDS $LCTL get_param -n $HSM_PARAM.$param)
        echo $val
 }
 
-set_hsm_param() {
-       local param=$1
-       local value=$2
-       local opt=$3
-       mdts_set_param "$opt -n" "hsm.$param" "$value"
-       return $?
-}
-
 set_test_state() {
        local cmd=$1
        local target=$2
@@ -564,15 +372,6 @@ set_test_state() {
        mdts_check_param hsm_control "$target" 10
 }
 
-cdt_set_sanity_policy() {
-       if [[ "$CDT_POLICY_HAD_CHANGED" ]]
-       then
-               # clear all
-               mdts_set_param "" hsm.policy "+NRA"
-               mdts_set_param "" hsm.policy "-NBR"
-               CDT_POLICY_HAD_CHANGED=
-       fi
-}
 
 cdt_set_no_retry() {
        mdts_set_param "" hsm.policy "+NRA"
@@ -598,21 +397,6 @@ cdt_clear_mount_state() {
        mdts_set_param "-P -d" hsm_control ""
 }
 
-cdt_set_mount_state() {
-       mdts_set_param "-P" hsm_control "$1"
-       # set_param -P is asynchronous operation and could race with set_param.
-       # In such case configs could be retrieved and applied at mgc after
-       # set_param -P completion. Sleep here to avoid race with set_param.
-       # We need at least 20 seconds. 10 for mgc_requeue_thread to wake up
-       # MGC_TIMEOUT_MIN_SECONDS + MGC_TIMEOUT_RAND_CENTISEC(5 + 5)
-       # and 10 seconds to retrieve config from server.
-       sleep 20
-}
-
-cdt_check_state() {
-       mdts_check_param hsm_control "$1" 20
-}
-
 cdt_disable() {
        set_test_state disabled disabled
 }
@@ -635,37 +419,6 @@ cdt_restart() {
        cdt_set_sanity_policy
 }
 
-needclients() {
-       local client_count=$1
-       if [[ $CLIENTCOUNT -lt $client_count ]]; then
-               skip "Need $client_count or more clients, have $CLIENTCOUNT"
-               return 1
-       fi
-       return 0
-}
-
-path2fid() {
-       $LFS path2fid $1 | tr -d '[]'
-       return ${PIPESTATUS[0]}
-}
-
-get_hsm_flags() {
-       local f=$1
-       local u=$2
-       local st
-
-       if [[ $u == "user" ]]; then
-               st=$($RUNAS $LFS hsm_state $f)
-       else
-               u=root
-               st=$($LFS hsm_state $f)
-       fi
-
-       [[ $? == 0 ]] || error "$LFS hsm_state $f failed (run as $u)"
-
-       st=$(echo $st | cut -f 2 -d" " | tr -d "()," )
-       echo $st
-}
 
 get_hsm_archive_id() {
        local f=$1
@@ -677,14 +430,6 @@ get_hsm_archive_id() {
        echo $ar
 }
 
-check_hsm_flags() {
-       local f=$1
-       local fl=$2
-
-       local st=$(get_hsm_flags $f)
-       [[ $st == $fl ]] || error "hsm flags on $f are $st != $fl"
-}
-
 check_hsm_flags_user() {
        local f=$1
        local fl=$2
@@ -721,27 +466,6 @@ delete_large_files() {
        wait_delete_completed
 }
 
-wait_result() {
-       local facet=$1
-       shift
-       wait_update --verbose $(facet_active_host $facet) "$@"
-}
-
-wait_request_state() {
-       local fid=$1
-       local request=$2
-       local state=$3
-       # 4th arg (mdt index) is optional
-       local mdtidx=${4:-0}
-       local mds=mds$(($mdtidx + 1))
-
-       local cmd="$LCTL get_param -n ${MDT_PREFIX}${mdtidx}.hsm.actions"
-       cmd+=" | awk '/'$fid'.*action='$request'/ {print \\\$13}' | cut -f2 -d="
-
-       wait_result $mds "$cmd" "$state" 200 ||
-               error "request on $fid is not $state on $mds"
-}
-
 get_request_state() {
        local fid=$1
        local request=$2
diff --git a/lustre/tests/sanity-pcc.sh b/lustre/tests/sanity-pcc.sh
new file mode 100644 (file)
index 0000000..205c0eb
--- /dev/null
@@ -0,0 +1,363 @@
+#!/bin/bash
+#
+# Run select tests by setting ONLY, or as arguments to the script.
+# Skip specific tests by setting EXCEPT.
+#
+# exit on error
+set -e
+set +o monitor
+
+SRCDIR=$(dirname $0)
+export PATH=$PWD/$SRCDIR:$SRCDIR:$PWD/$SRCDIR/utils:$PATH:/sbin:/usr/sbin
+
+ONLY=${ONLY:-"$*"}
+# bug number for skipped test:
+ALWAYS_EXCEPT=""
+# UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!
+
+ENABLE_PROJECT_QUOTAS=${ENABLE_PROJECT_QUOTAS:-true}
+
+LUSTRE=${LUSTRE:-$(cd $(dirname $0)/..; echo $PWD)}
+
+. $LUSTRE/tests/test-framework.sh
+init_test_env $@
+. ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh}
+init_logging
+
+MULTIOP=${MULTIOP:-multiop}
+OPENFILE=${OPENFILE:-openfile}
+MOUNT_2=${MOUNT_2:-"yes"}
+FAIL_ON_ERROR=false
+
+# script only handles up to 10 MDTs (because of MDT_PREFIX)
+[ $MDSCOUNT -gt 9 ] &&
+       error "script cannot handle more than 9 MDTs, please fix" && exit
+
+check_and_setup_lustre
+
+if [[ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.12.52) ]]; then
+       skip_env "Need MDS version at least 2.12.52" && exit
+fi
+
+# $RUNAS_ID may get set incorrectly somewhere else
+if [[ $UID -eq 0 && $RUNAS_ID -eq 0 ]]; then
+       skip_env "\$RUNAS_ID set to 0, but \$UID is also 0!" && exit
+fi
+check_runas_id $RUNAS_ID $RUNAS_GID $RUNAS
+if getent group nobody; then
+       GROUP=nobody
+elif getent group nogroup; then
+       GROUP=nogroup
+else
+       error "No generic nobody group"
+fi
+
+build_test_filter
+
+# if there is no CLIENT1 defined, some tests can be ran on localhost
+CLIENT1=${CLIENT1:-$HOSTNAME}
+# if CLIENT2 doesn't exist then use CLIENT1 instead
+# All tests should use CLIENT2 with MOUNT2 only therefore it will work if
+# $CLIENT2 == CLIENT1
+# Exception is the test which need two separate nodes
+CLIENT2=${CLIENT2:-$CLIENT1}
+
+check_file_size()
+{
+       local client="$1"
+       local fpath="$2"
+       local expected_size="$3"
+
+       size=$(do_facet $client stat "--printf=%s" $fpath)
+       [[ $size == "$expected_size" ]] || error \
+               "expected $fpath size: $expected_size got: $size"
+}
+
+check_lpcc_sizes()
+{
+       local client="$1"
+       local lpcc_fpath="$2"
+       local lustre_fpath="$3"
+       local expected_size="$4"
+
+       check_file_size $client $lpcc_fpath $expected_size
+       check_file_size $client $lustre_fpath $expected_size
+}
+
+check_file_data()
+{
+       local client="$1"
+       local path="$2"
+       local expected_data="$3"
+
+       path_data=$(do_facet $client cat $path)
+       [[ "x$path_data" == "x$expected_data" ]] || error \
+               "expected $path: $expected_data, got: $path_data"
+}
+
+check_lpcc_data()
+{
+       local client="$1"
+       local lpcc_fpath="$2"
+       local lustre_fpath="$3"
+       local expected_data="$4"
+
+       check_file_data  "$client" "$lpcc_fpath" "$expected_data"
+       check_file_data  "$client" "$lustre_fpath" "$expected_data"
+}
+
+lpcc_fid2path()
+{
+       local hsm_root="$1"
+       local lustre_path="$2"
+       local fid=$(path2fid $lustre_path)
+
+       local -a f_seq
+       local -a f_oid
+       local -a f_ver
+
+       f_seq=$(echo $fid | awk -F ':' '{print $1}')
+       f_oid=$(echo $fid | awk -F ':' '{print $2}')
+       f_ver=$(echo $fid | awk -F ':' '{print $3}')
+
+       printf "%s/%04x/%04x/%04x/%04x/%04x/%04x/%s" \
+               $hsm_root $(($f_oid & 0xFFFF)) \
+               $(($f_oid >> 16 & 0xFFFF)) \
+               $(($f_seq & 0xFFFF)) \
+               $(($f_seq >> 16 & 0xFFFF)) \
+               $(($f_seq >> 32 & 0xFFFF)) \
+               $(($f_seq >> 48 & 0xFFFF)) $fid
+}
+
+check_lpcc_state()
+{
+       local lustre_path="$1"
+       local expected_state="$2"
+       local state=$(do_facet $SINGLEAGT $LFS pcc state $lustre_path |
+                       awk -F 'type: ' '{print $2}' | awk -F ',' '{print $1}')
+
+       [[ "x$state" == "x$expected_state" ]] || error \
+               "$lustre_path expected pcc state: $expected_state, but got: $state"
+}
+
+# initiate variables
+init_agt_vars
+
+# populate MDT device array
+get_mdt_devices
+
+# cleanup from previous bad setup
+kill_copytools
+
+# for recovery tests, coordinator needs to be started at mount
+# so force it
+# the lustre conf must be without hsm on (like for sanity.sh)
+echo "Set HSM on and start"
+cdt_set_mount_state enabled
+cdt_check_state enabled
+
+echo "Set sanity-hsm HSM policy"
+cdt_set_sanity_policy
+
+# finished requests are quickly removed from list
+set_hsm_param grace_delay 10
+
+cleanup_pcc_mapping() {
+       do_facet $SINGLEAGT $LCTL pcc clear $MOUNT
+}
+
+setup_pcc_mapping() {
+       local hsm_root=$(hsm_root)
+
+       cleanup_pcc_mapping
+       do_facet $SINGLEAGT $LCTL pcc add $MOUNT $hsm_root \
+               -p "$HSM_ARCHIVE_NUMBER\ 100"
+}
+
+lpcc_rw_test() {
+       local restore="$1"
+       local project="$2"
+       local project_id=100
+       local agt_facet=$SINGLEAGT
+       local hsm_root=$(hsm_root)
+       local file=$DIR/$tdir/$tfile
+       local -a state
+       local -a lpcc_path
+       local -a size
+
+       $project && enable_project_quota
+
+       do_facet $SINGLEAGT rm -rf $hsm_root
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+
+       is_project_quota_supported || project=false
+
+       do_facet $SINGLEAGT mkdir -p $DIR/$tdir
+       setup_pcc_mapping
+       $project && lfs project -sp $project_id $DIR/$tdir
+
+       do_facet $SINGLEAGT "echo -n attach_origin > $file"
+       if ! $project; then
+               check_lpcc_state $file "none"
+               do_facet $SINGLEAGT $LFS pcc attach -i \
+                       $HSM_ARCHIVE_NUMBER $file ||
+                       error "pcc attach $file failed"
+       fi
+
+       check_lpcc_state $file "readwrite"
+       # HSM released exists archived status
+       check_hsm_flags $file "0x0000000d"
+       lpcc_path=$(lpcc_fid2path $hsm_root $file)
+       check_lpcc_data $SINGLEAGT $lpcc_path $file "attach_origin"
+
+       do_facet $SINGLEAGT dd if=/dev/zero of=$file bs=7654321 count=1
+       check_lpcc_sizes $SINGLEAGT $lpcc_path $file 7654321
+
+       do_facet $SINGLEAGT $TRUNCATE $file 1234567 ||
+               error "truncate failed"
+       check_lpcc_sizes $SINGLEAGT $lpcc_path $file 1234567
+       check_lpcc_state $file "readwrite"
+
+       do_facet $SINGLEAGT "echo -n file_data > $file"
+       check_lpcc_state $file "readwrite"
+       # HSM released exists archived status
+       check_hsm_flags $file "0x0000000d"
+       check_lpcc_data $SINGLEAGT $lpcc_path $file "file_data"
+
+       if [ $CLIENTCOUNT -lt 2 -o $restore ]; then
+               $LFS hsm_restore $file || error \
+                       "failed to restore $file"
+               wait_request_state $(path2fid $file) RESTORE SUCCEED
+       else
+               path_data=$(do_node $CLIENT2 cat $file)
+               [[ "x$path_data" == "xfile_data" ]] || error \
+                       "expected file_data, got: $path_data"
+       fi
+
+       check_lpcc_state $file "none"
+       # HSM exists archived status
+       check_hsm_flags $file "0x00000009"
+
+       echo -n "new_data" > $file
+       check_lpcc_state $file "none"
+       # HSM exists dirty archived status
+       check_hsm_flags $file "0x0000000b"
+       check_file_data $SINGLEAGT $file "new_data"
+
+       echo "Attach and detach testing"
+       rm -f $file
+       do_facet $SINGLEAGT "echo -n new_data2 > $file"
+       if ! $project; then
+               check_lpcc_state $file "none"
+               do_facet $SINGLEAGT $LFS pcc attach -i \
+                       $HSM_ARCHIVE_NUMBER $file ||
+                       error "PCC attach $file failed"
+       fi
+       check_lpcc_state $file "readwrite"
+       # HSM released exists archived status
+       check_hsm_flags $file "0x0000000d"
+       do_facet $SINGLEAGT "echo -n attach_detach > $file"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "PCC detach $file failed"
+       check_lpcc_state $file "none"
+       # HSM released exists archived status
+       check_hsm_flags $file "0x0000000d"
+       check_file_data $SINGLEAGT $file "attach_detach"
+
+       cleanup_pcc_mapping
+}
+
+test_1a() {
+       lpcc_rw_test true false
+}
+run_test 1a "Test manual lfs pcc attach with manual HSM restore"
+
+test_1b() {
+       lpcc_rw_test false false
+}
+run_test 1b "Test manual lfs pcc attach with restore on remote access"
+
+test_1c() {
+       lpcc_rw_test true true
+}
+run_test 1c "Test automated attach using Project ID with manual HSM restore"
+
+test_1d() {
+       lpcc_rw_test false true
+}
+run_test 1d "Test Project ID with remote access"
+
+
+#
+# When a process created a LPCC file and holding the open,
+# another process on the same client should be able to open the file.
+#
+test_2() {
+       local project_id=100
+       local agt_facet=$SINGLEAGT
+       local hsm_root=$(hsm_root)
+       local agt_host=$(facet_active_host $SINGLEAGT)
+
+       ! is_project_quota_supported &&
+               skip "project quota is not supported" && return
+
+       enable_project_quota
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping
+       file=$DIR/$tdir/multiop
+       mkdir -p $DIR/$tdir
+       rm -f $file
+
+       do_facet $SINGLEAGT $LFS project -sp $project_id $DIR/$tdir ||
+               error "failed to set project quota"
+       rmultiop_start $agt_host $file O_c || error "open $file failed"
+       # HSM released exists archived status
+       check_hsm_flags $file "0x0000000d"
+       do_facet $SINGLEAGT "echo -n multiopen_data > $file" ||
+               error "failed to echo multiopen_data to $file"
+
+       lpcc_path=$(lpcc_fid2path $hsm_root $file)
+       do_facet $SINGLEAGT ls -l $lpcc_path ||
+               error "failed to ls $lpcc_path"
+       check_lpcc_data $SINGLEAGT $lpcc_path $file "multiopen_data"
+       # HSM released exists archived status
+       check_hsm_flags $file "0x0000000d"
+
+       rmultiop_stop $agt_host || error "close $file failed"
+       cleanup_pcc_mapping
+}
+run_test 2 "Test multi open when creating"
+
+test_3() {
+       local file=$DIR/$tdir/$tfile
+
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping
+
+       mkdir -p $DIR/$tdir || error "mkdir $DIR/$tdir failed"
+       dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write to $file"
+
+       echo "Start to attach/detach the file: $file"
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+
+       echo "Repeat to attach/detach the same file: $file"
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+
+       cleanup_pcc_mapping
+}
+run_test 3 "Repeat attach/detach operations"
+
+complete $SECONDS
+check_and_cleanup_lustre
+exit_status
index 3336688..ee01ef1 100755 (executable)
@@ -73,24 +73,6 @@ export QUOTA_AUTO=0
 check_and_setup_lustre
 
 ENABLE_PROJECT_QUOTAS=${ENABLE_PROJECT_QUOTAS:-true}
-is_project_quota_supported() {
-       $ENABLE_PROJECT_QUOTAS || return 1
-       [ "$(facet_fstype $SINGLEMDS)" == "ldiskfs" ] &&
-               [ $(lustre_version_code $SINGLEMDS) -gt \
-               $(version_code 2.9.55) ] &&
-               lfs --help | grep project >&/dev/null &&
-               egrep -q "7." /etc/redhat-release && return 0
-
-       if [ "$(facet_fstype $SINGLEMDS)" == "zfs" ]; then
-               [ $(lustre_version_code $SINGLEMDS) -le \
-                       $(version_code 2.10.53) ] && return 1
-
-               do_facet mds1 $ZPOOL upgrade -v |
-                       grep project_quota && return 0
-       fi
-
-       return 1
-}
 
 SHOW_QUOTA_USER="$LFS quota -v -u $TSTUSR $DIR"
 SHOW_QUOTA_USERID="$LFS quota -v -u $TSTID $DIR"
@@ -351,25 +333,6 @@ wait_ost_reint() {
        return 0
 }
 
-disable_project_quota() {
-       is_project_quota_supported || return 0
-       [ "$(facet_fstype $SINGLEMDS)" != "ldiskfs" ] && return 0
-       stopall || error "failed to stopall (1)"
-
-       for num in $(seq $MDSCOUNT); do
-               do_facet mds$num $TUNE2FS -Q ^prj $(mdsdevname $num) ||
-                       error "tune2fs $(mdsdevname $num) failed"
-       done
-
-       for num in $(seq $OSTCOUNT); do
-               do_facet ost$num $TUNE2FS -Q ^prj $(ostdevname $num) ||
-                       error "tune2fs $(ostdevname $num) failed"
-       done
-
-       mount
-       setupall
-}
-
 setup_quota_test() {
        wait_delete_completed
        echo "Creating test directory"
@@ -421,25 +384,6 @@ quota_show_check() {
        fi
 }
 
-enable_project_quota() {
-       is_project_quota_supported || return 0
-       [ "$(facet_fstype $SINGLEMDS)" != "ldiskfs" ] && return 0
-       stopall || error "failed to stopall (1)"
-
-       for num in $(seq $MDSCOUNT); do
-               do_facet mds$num $TUNE2FS -O project $(mdsdevname $num) ||
-                       error "tune2fs $(mdsdevname $num) failed"
-       done
-
-       for num in $(seq $OSTCOUNT); do
-               do_facet ost$num $TUNE2FS -O project $(ostdevname $num) ||
-                       error "tune2fs $(ostdevname $num) failed"
-       done
-
-       mount
-       setupall
-}
-
 project_quota_enabled () {
        local rc=0
        for num in $(seq $MDSCOUNT); do
index 5b78d09..e749838 100755 (executable)
@@ -9476,3 +9476,518 @@ verify_yaml_layout() {
        [ "$layout1" == "$layout2" ] ||
                error "$msg_prefix $src/$dst layouts are not equal"
 }
+
+is_project_quota_supported() {
+       $ENABLE_PROJECT_QUOTAS || return 1
+       [ "$(facet_fstype $SINGLEMDS)" == "ldiskfs" ] &&
+               [ $(lustre_version_code $SINGLEMDS) -gt \
+               $(version_code 2.9.55) ] &&
+               lfs --help | grep project >&/dev/null &&
+               egrep -q "7." /etc/redhat-release && return 0
+
+       if [ "$(facet_fstype $SINGLEMDS)" == "zfs" ]; then
+               [ $(lustre_version_code $SINGLEMDS) -le \
+                       $(version_code 2.10.53) ] && return 1
+
+               do_fact mds1 $ZPOOL upgrade -v |
+                       grep project_quota && return 0
+       fi
+
+       return 1
+}
+
+enable_project_quota() {
+       is_project_quota_supported || return 0
+       [ "$(facet_fstype $SINGLEMDS)" != "ldiskfs" ] && return 0
+       stopall || error "failed to stopall (1)"
+
+       for num in $(seq $MDSCOUNT); do
+               do_facet mds$num $TUNE2FS -O project $(mdsdevname $num) ||
+                       error "tune2fs $(mdsdevname $num) failed"
+       done
+
+       for num in $(seq $OSTCOUNT); do
+               do_facet ost$num $TUNE2FS -O project $(ostdevname $num) ||
+                       error "tune2fs $(ostdevname $num) failed"
+       done
+
+       mount
+       setupall
+}
+
+disable_project_quota() {
+       is_project_quota_supported || return 0
+       [ "$(facet_fstype $SINGLEMDS)" != "ldiskfs" ] && return 0
+       stopall || error "failed to stopall (1)"
+
+       for num in $(seq $MDSCOUNT); do
+               do_facet mds$num $TUNE2FS -Q ^prj $(mdsdevname $num) ||
+                       error "tune2fs $(mdsdevname $num) failed"
+       done
+
+       for num in $(seq $OSTCOUNT); do
+               do_facet ost$num $TUNE2FS -Q ^prj $(ostdevname $num) ||
+                       error "tune2fs $(ostdevname $num) failed"
+       done
+
+       mount
+       setupall
+}
+
+#
+# In order to test multiple remote HSM agents, a new facet type named "AGT" and
+# the following associated variables are added:
+#
+# AGTCOUNT: number of agents
+# AGTDEV{N}: target HSM mount point (root path of the backend)
+# agt{N}_HOST: hostname of the agent agt{N}
+# SINGLEAGT: facet of the single agent
+#
+# The number of agents is initialized as the number of remote client nodes.
+# By default, only single copytool is started on a remote client/agent. If there
+# was no remote client, then the copytool will be started on the local client.
+#
+init_agt_vars() {
+       local n
+       local agent
+
+       export AGTCOUNT=${AGTCOUNT:-$((CLIENTCOUNT - 1))}
+       [[ $AGTCOUNT -gt 0 ]] || AGTCOUNT=1
+
+       export SHARED_DIRECTORY=${SHARED_DIRECTORY:-$TMP}
+       if [[ $CLIENTCOUNT -gt 1 ]] &&
+               ! check_shared_dir $SHARED_DIRECTORY $CLIENTS; then
+               skip_env "SHARED_DIRECTORY should be accessible"\
+                        "on all client nodes"
+               exit 0
+       fi
+
+       # We used to put the HSM archive in $SHARED_DIRECTORY but that
+       # meant NFS issues could hose sanity-hsm sessions. So now we
+       # use $TMP instead.
+       for n in $(seq $AGTCOUNT); do
+               eval export AGTDEV$n=\$\{AGTDEV$n:-"$TMP/arc$n"\}
+               agent=CLIENT$((n + 1))
+               if [[ -z "${!agent}" ]]; then
+                       [[ $CLIENTCOUNT -eq 1 ]] && agent=CLIENT1 ||
+                               agent=CLIENT2
+               fi
+               eval export agt${n}_HOST=\$\{agt${n}_HOST:-${!agent}\}
+               local var=agt${n}_HOST
+               [[ ! -z "${!var}" ]] || error "agt${n}_HOST is empty!"
+       done
+
+       export SINGLEAGT=${SINGLEAGT:-agt1}
+
+       export HSMTOOL=${HSMTOOL:-"lhsmtool_posix"}
+       export HSMTOOL_VERBOSE=${HSMTOOL_VERBOSE:-""}
+       export HSMTOOL_UPDATE_INTERVAL=${HSMTOOL_UPDATE_INTERVAL:=""}
+       export HSMTOOL_EVENT_FIFO=${HSMTOOL_EVENT_FIFO:=""}
+       export HSMTOOL_TESTDIR
+       export HSMTOOL_BASE=$(basename "$HSMTOOL" | cut -f1 -d" ")
+
+       HSM_ARCHIVE_NUMBER=2
+
+       # The test only support up to 10 MDTs
+       MDT_PREFIX="mdt.$FSNAME-MDT000"
+       HSM_PARAM="${MDT_PREFIX}0.hsm"
+
+       # archive is purged at copytool setup
+       HSM_ARCHIVE_PURGE=true
+
+       # Don't allow copytool error upon start/setup
+       HSMTOOL_NOERROR=false
+}
+
+# Get the backend root path for the given agent facet.
+copytool_device() {
+       local facet=$1
+       local dev=AGTDEV$(facet_number $facet)
+
+       echo -n ${!dev}
+}
+
+get_mdt_devices() {
+       local mdtno
+       # get MDT device for each mdc
+       for mdtno in $(seq 1 $MDSCOUNT); do
+               local idx=$(($mdtno - 1))
+               MDT[$idx]=$($LCTL get_param -n \
+                       mdc.$FSNAME-MDT000${idx}-mdc-*.mds_server_uuid |
+                       awk '{gsub(/_UUID/,""); print $1}' | head -n1)
+       done
+}
+
+search_copytools() {
+       local hosts=${1:-$(facet_active_host $SINGLEAGT)}
+       do_nodesv $hosts "pgrep -x $HSMTOOL_BASE"
+}
+
+kill_copytools() {
+       local hosts=${1:-$(facet_active_host $SINGLEAGT)}
+
+       echo "Killing existing copytools on $hosts"
+       do_nodesv $hosts "killall -q $HSMTOOL_BASE" || true
+}
+
+wait_copytools() {
+       local hosts=${1:-$(facet_active_host $SINGLEAGT)}
+       local wait_timeout=200
+       local wait_start=$SECONDS
+       local wait_end=$((wait_start + wait_timeout))
+       local sleep_time=100000 # 0.1 second
+
+       while ((SECONDS < wait_end)); do
+               if ! search_copytools $hosts; then
+                       echo "copytools stopped in $((SECONDS - wait_start))s"
+                       return 0
+               fi
+
+               echo "copytools still running on $hosts"
+               usleep $sleep_time
+               [ $sleep_time -lt 32000000 ] && # 3.2 seconds
+                       sleep_time=$(bc <<< "$sleep_time * 2")
+       done
+
+       # try to dump Copytool's stack
+       do_nodesv $hosts "echo 1 >/proc/sys/kernel/sysrq ; " \
+                        "echo t >/proc/sysrq-trigger"
+
+       echo "copytools failed to stop in ${wait_timeout}s"
+
+       return 1
+}
+
+copytool_monitor_cleanup() {
+       local facet=${1:-$SINGLEAGT}
+       local agent=$(facet_active_host $facet)
+
+       if [ -n "$HSMTOOL_MONITOR_DIR" ]; then
+               # Should die when the copytool dies, but just in case.
+               local cmd="kill \\\$(cat $HSMTOOL_MONITOR_DIR/monitor_pid)"
+               cmd+=" 2>/dev/null || true"
+               do_node $agent "$cmd"
+               do_node $agent "rm -fr $HSMTOOL_MONITOR_DIR"
+               export HSMTOOL_MONITOR_DIR=
+       fi
+
+       # The pdsh should die on its own when the monitor dies. Just
+       # in case, though, try to clean up to avoid any cruft.
+       if [ -n "$HSMTOOL_MONITOR_PDSH" ]; then
+               kill $HSMTOOL_MONITOR_PDSH 2>/dev/null || true
+               export HSMTOOL_MONITOR_PDSH=
+       fi
+}
+
+copytool_logfile()
+{
+       local host="$(facet_host "$1")"
+       local prefix=$TESTLOG_PREFIX
+       [ -n "$TESTNAME" ] && prefix+=.$TESTNAME
+
+       printf "${prefix}.copytool${archive_id}_log.${host}.log"
+}
+
+__lhsmtool_rebind()
+{
+       do_facet $facet $HSMTOOL -p "$hsm_root" --rebind "$@" "$mountpoint"
+}
+
+__lhsmtool_import()
+{
+       mkdir -p "$(dirname "$2")" ||
+               error "cannot create directory '$(dirname "$2")'"
+       do_facet $facet $HSMTOOL -p "$hsm_root" --import "$@" "$mountpoint"
+}
+
+__lhsmtool_setup()
+{
+       local cmd="$HSMTOOL $HSMTOOL_VERBOSE --daemon --hsm-root \"$hsm_root\""
+       [ -n "$bandwidth" ] && cmd+=" --bandwidth $bandwidth"
+       [ -n "$archive_id" ] && cmd+=" --archive $archive_id"
+       [ ${#misc_options[@]} -gt 0 ] &&
+               cmd+=" $(IFS=" " echo "$@")"
+       cmd+=" \"$mountpoint\""
+
+       echo "Starting copytool $facet on $(facet_host $facet)"
+       stack_trap "do_facet $facet libtool execute pkill -x '$HSMTOOL' || true" EXIT
+       do_facet $facet "$cmd < /dev/null > \"$(copytool_logfile $facet)\" 2>&1"
+}
+
+hsm_root() {
+       local facet="${1:-$SINGLEAGT}"
+
+       printf "$(copytool_device "$facet")/${TESTSUITE}.${TESTNAME}/"
+}
+
+# Main entry point to perform copytool related operations
+#
+# Sub-commands:
+#
+#      setup   setup a copytool to run in the background, that copytool will be
+#              killed on EXIT
+#      import  import a file from an HSM backend
+#      rebind  rebind an archived file to a new fid
+#
+# Although the semantics might suggest otherwise, one does not need to 'setup'
+# a copytool before a call to 'copytool import' or 'copytool rebind'.
+#
+copytool()
+{
+       local action=$1
+       shift
+
+       # Parse arguments
+       local fail_on_error=true
+       local -a misc_options
+       while [ $# -gt 0 ]; do
+               case "$1" in
+               -f|--facet)
+                       shift
+                       local facet="$1"
+                       ;;
+               -m|--mountpoint)
+                       shift
+                       local mountpoint="$1"
+                       ;;
+               -a|--archive-id)
+                       shift
+                       local archive_id="$1"
+                       ;;
+               -b|--bwlimit)
+                       shift
+                       local bandwidth="$1" # in MB/s
+                       ;;
+               -n|--no-fail)
+                       local fail_on_error=false
+                       ;;
+               *)
+                       # Uncommon(/copytool dependent) option
+                       misc_options+=("$1")
+                       ;;
+               esac
+               shift
+       done
+
+       # Use default values if needed
+       local facet=${facet:-$SINGLEAGT}
+       local mountpoint="${mountpoint:-${MOUNT2:-$MOUNT}}"
+       local hsm_root="$(hsm_root "$facet")"
+
+       stack_trap "do_facet $facet rm -rf '$hsm_root'" EXIT
+       do_facet $facet mkdir -p "$hsm_root" ||
+               error "mkdir '$hsm_root' failed"
+
+       case "$HSMTOOL" in
+       lhsmtool_posix)
+               local copytool=lhsmtool
+               ;;
+       esac
+
+       __${copytool}_${action} "${misc_options[@]}"
+       if [ $? -ne 0 ]; then
+               local error_msg
+
+               case $action in
+               setup)
+                       local host="$(facet_host $facet)"
+                       error_msg="Failed to start copytool $facet on '$host'"
+                       ;;
+               import)
+                       local src="${misc_options[0]}"
+                       local dest="${misc_options[1]}"
+                       error_msg="Failed to import '$src' to '$dest'"
+                       ;;
+               rebind)
+                       error_msg="could not rebind file"
+                       ;;
+               esac
+
+               $fail_on_error && error "$error_msg" || echo "$error_msg"
+       fi
+}
+
+needclients() {
+       local client_count=$1
+       if [[ $CLIENTCOUNT -lt $client_count ]]; then
+               skip "Need $client_count or more clients, have $CLIENTCOUNT"
+               return 1
+       fi
+       return 0
+}
+
+path2fid() {
+       $LFS path2fid $1 | tr -d '[]'
+       return ${PIPESTATUS[0]}
+}
+
+get_hsm_flags() {
+       local f=$1
+       local u=$2
+       local st
+
+       if [[ $u == "user" ]]; then
+               st=$($RUNAS $LFS hsm_state $f)
+       else
+               u=root
+               st=$($LFS hsm_state $f)
+       fi
+
+       [[ $? == 0 ]] || error "$LFS hsm_state $f failed (run as $u)"
+
+       st=$(echo $st | cut -f 2 -d" " | tr -d "()," )
+       echo $st
+}
+
+check_hsm_flags() {
+       local f=$1
+       local fl=$2
+
+       local st=$(get_hsm_flags $f)
+       [[ $st == $fl ]] || error "hsm flags on $f are $st != $fl"
+}
+
+mdts_set_param() {
+       local arg=$1
+       local key=$2
+       local value=$3
+       local mdtno
+       local rc=0
+       if [[ "$value" != "" ]]; then
+               value="=$value"
+       fi
+       for mdtno in $(seq 1 $MDSCOUNT); do
+               local idx=$(($mdtno - 1))
+               local facet=mds${mdtno}
+               # if $arg include -P option, run 1 set_param per MDT on the MGS
+               # else, run set_param on each MDT
+               [[ $arg = *"-P"* ]] && facet=mgs
+               do_facet $facet $LCTL set_param $arg mdt.${MDT[$idx]}.$key$value
+               [[ $? != 0 ]] && rc=1
+       done
+       return $rc
+}
+
+wait_result() {
+       local facet=$1
+       shift
+       wait_update --verbose $(facet_active_host $facet) "$@"
+}
+
+mdts_check_param() {
+       local key="$1"
+       local target="$2"
+       local timeout="$3"
+       local mdtno
+       for mdtno in $(seq 1 $MDSCOUNT); do
+               local idx=$(($mdtno - 1))
+               wait_result mds${mdtno} \
+                       "$LCTL get_param -n $MDT_PREFIX${idx}.$key" "$target" \
+                       $timeout ||
+                       error "$key state is not '$target' on mds${mdtno}"
+       done
+}
+
+cdt_set_mount_state() {
+       mdts_set_param "-P" hsm_control "$1"
+       # set_param -P is asynchronous operation and could race with set_param.
+       # In such case configs could be retrieved and applied at mgc after
+       # set_param -P completion. Sleep here to avoid race with set_param.
+       # We need at least 20 seconds. 10 for mgc_requeue_thread to wake up
+       # MGC_TIMEOUT_MIN_SECONDS + MGC_TIMEOUT_RAND_CENTISEC(5 + 5)
+       # and 10 seconds to retrieve config from server.
+       sleep 20
+}
+
+cdt_check_state() {
+       mdts_check_param hsm_control "$1" 20
+}
+
+cdt_set_sanity_policy() {
+       if [[ "$CDT_POLICY_HAD_CHANGED" ]]
+       then
+               # clear all
+               mdts_set_param "" hsm.policy "+NRA"
+               mdts_set_param "" hsm.policy "-NBR"
+               CDT_POLICY_HAD_CHANGED=
+       fi
+}
+
+set_hsm_param() {
+       local param=$1
+       local value=$2
+       local opt=$3
+       mdts_set_param "$opt -n" "hsm.$param" "$value"
+       return $?
+}
+
+wait_request_state() {
+       local fid=$1
+       local request=$2
+       local state=$3
+       # 4th arg (mdt index) is optional
+       local mdtidx=${4:-0}
+       local mds=mds$(($mdtidx + 1))
+
+       local cmd="$LCTL get_param -n ${MDT_PREFIX}${mdtidx}.hsm.actions"
+       cmd+=" | awk '/'$fid'.*action='$request'/ {print \\\$13}' | cut -f2 -d="
+
+       wait_result $mds "$cmd" "$state" 200 ||
+               error "request on $fid is not $state on $mds"
+}
+
+
+rmultiop_start() {
+       local client=$1
+       local file=$2
+       local cmds=$3
+       local WAIT_MAX=${4:-60}
+       local wait_time=0
+
+       # We need to run do_node in bg, because pdsh does not exit
+       # if child process of run script exists.
+       # I.e. pdsh does not exit when runmultiop_bg_pause exited,
+       # because of multiop_bg_pause -> $MULTIOP_PROG &
+       # By the same reason we need sleep a bit after do_nodes starts
+       # to let runmultiop_bg_pause start muliop and
+       # update /tmp/multiop_bg.pid ;
+       # The rm /tmp/multiop_bg.pid guarantees here that
+       # we have the updated by runmultiop_bg_pause
+       # /tmp/multiop_bg.pid file
+
+       local pid_file=$TMP/multiop_bg.pid.$$
+
+       do_node $client "MULTIOP_PID_FILE=$pid_file LUSTRE= \
+                       runmultiop_bg_pause $file $cmds" &
+       local pid=$!
+       local multiop_pid
+
+       while [[ $wait_time -lt $WAIT_MAX ]]; do
+               sleep 3
+               wait_time=$((wait_time + 3))
+               multiop_pid=$(do_node $client cat $pid_file)
+               if [ -n "$multiop_pid" ]; then
+                       break
+               fi
+       done
+
+       [ -n "$multiop_pid" ] ||
+               error "$client : Can not get multiop_pid from $pid_file "
+
+       eval export $(node_var_name $client)_multiop_pid=$multiop_pid
+       eval export $(node_var_name $client)_do_node_pid=$pid
+       local var=$(node_var_name $client)_multiop_pid
+       echo client $client multiop_bg started multiop_pid=${!var}
+       return $?
+}
+
+rmultiop_stop() {
+       local client=$1
+       local multiop_pid=$(node_var_name $client)_multiop_pid
+       local do_node_pid=$(node_var_name $client)_do_node_pid
+
+       echo "Stopping multiop_pid=${!multiop_pid} (kill ${!multiop_pid} on $client)"
+       do_node $client kill -USR1 ${!multiop_pid}
+
+       wait ${!do_node_pid}
+}
index 86942e5..fb1d17c 100644 (file)
@@ -25,3 +25,4 @@ sanity-lfsck
 sanity-hsm
 sanity-lsnapshot
 sanity-pfl
+sanity-pcc
index 7726f27..75e6018 100644 (file)
@@ -106,7 +106,7 @@ liblustreapi_la_SOURCES = liblustreapi.c liblustreapi_hsm.c \
                          liblustreapi_kernelconn.c liblustreapi_param.c \
                          liblustreapi_mirror.c \
                          liblustreapi_ladvise.c liblustreapi_chlg.c \
-                         liblustreapi_heat.c
+                         liblustreapi_heat.c liblustreapi_pcc.c
 liblustreapi_la_LDFLAGS = $(LIBREADLINE) -version-info 1:0:0 \
                          -Wl,--version-script=liblustreapi.map
 liblustreapi_la_LIBADD = $(top_builddir)/libcfs/libcfs/libcfs.la
index 8f330d9..8504c72 100644 (file)
@@ -54,6 +54,36 @@ static int jt_opt_ignore_errors(int argc, char **argv) {
         return 0;
 }
 
+static int jt_pcc_list_commands(int argc, char **argv);
+static int jt_pcc(int argc, char **argv);
+
+/**
+ * command_t pccdev_cmdlist - lctl pcc commands.
+ */
+command_t pccdev_cmdlist[] = {
+       { .pc_name = "add", .pc_func = jt_pcc_add,
+         .pc_help = "Add a PCC backend to a client.\n"
+               "usage: lctl pcc add <mntpath> <pccpath> [--param|-p <param>]\n"
+               "\tmntpath: Lustre mount point.\n"
+               "\tpccpath: Path of the PCC backend.\n"
+               "\tparam:   Setting parameters for PCC backend.\n" },
+       { .pc_name = "del", .pc_func = jt_pcc_del,
+         .pc_help = "Delete the specified PCC backend on a client.\n"
+               "usage: clt pcc del <mntpath> <pccpath>\n" },
+       { .pc_name = "clear", .pc_func = jt_pcc_clear,
+         .pc_help = "Remove all PCC backend on a client.\n"
+               "usage: lctl pcc clear <mntpath>\n" },
+       { .pc_name = "list", .pc_func = jt_pcc_list,
+         .pc_help = "List all PCC backends on a client.\n"
+               "usage: lctl pcc list <mntpath>\n" },
+       { .pc_name = "list-commands", .pc_func = jt_pcc_list_commands,
+         .pc_help = "list commands supported by lctl pcc"},
+       { .pc_name = "help", .pc_func = Parser_help, .pc_help = "help" },
+       { .pc_name = "exit", .pc_func = Parser_quit, .pc_help = "quit" },
+       { .pc_name = "quit", .pc_func = Parser_quit, .pc_help = "quit" },
+       { .pc_help = NULL }
+};
+
 command_t cmdlist[] = {
        /* Metacommands */
        {"===== metacommands =======", NULL, 0, "metacommands"},
@@ -349,6 +379,15 @@ command_t cmdlist[] = {
         "deregister an existing changelog user\n"
         "usage: --device <mdtname> changelog_deregister <id>"},
 
+       /* Persistent Client Cache (PCC) commands */
+       {"=== Persistent Client Cache ===", NULL, 0, "PCC user management"},
+       {"pcc", jt_pcc, pccdev_cmdlist,
+        "lctl commands used to interact with PCC features:\n"
+        "lclt pcc add    - add a PCC backend to a client\n"
+        "lclt pcc del    - delete a PCC backend on a client\n"
+        "lclt pcc clear  - remove all PCC backends on a client\n"
+        "lclt pcc list   - list all PCC backends on a client\n"},
+
        /* Device configuration commands */
        {"== device setup (these are not normally used post 1.4) ==",
                NULL, 0, "device config"},
@@ -534,6 +573,55 @@ command_t cmdlist[] = {
        { 0, 0, 0, NULL }
 };
 
+/**
+ * jt_pcc_list_commands() - List lctl pcc commands.
+ * @argc: The count of command line arguments.
+ * @argv: Array of strings for command line arguments.
+ *
+ * This function lists lctl pcc commands defined in pccdev_cmdlist[].
+ *
+ * Return: 0 on success.
+ */
+static int jt_pcc_list_commands(int argc, char **argv)
+{
+       char buffer[81] = "";
+
+       Parser_list_commands(pccdev_cmdlist, buffer, sizeof(buffer),
+                            NULL, 0, 4);
+
+       return 0;
+}
+
+/**
+ * jt_pcc() - Parse and execute lctl pcc commands.
+ * @argc: The count of lctl pcc command line arguments.
+ * @argv: Array of strings for lctl pcc command line arguments.
+ *
+ * This function parses lfs pcc commands and performs the
+ * corresponding functions specified in pccdev_cmdlist[].
+ *
+ * Return: 0 on success or an error code on failure.
+ */
+static int jt_pcc(int argc, char **argv)
+{
+       char cmd[PATH_MAX];
+       int rc = 0;
+
+       setlinebuf(stdout);
+
+       Parser_init("lctl-pcc > ", pccdev_cmdlist);
+
+       snprintf(cmd, sizeof(cmd), "%s %s", program_invocation_short_name,
+                argv[0]);
+       program_invocation_short_name = cmd;
+       if (argc > 1)
+               rc = Parser_execarg(argc - 1, argv + 1, pccdev_cmdlist);
+       else
+               rc = Parser_commands();
+
+       return rc < 0 ? -rc : rc;
+}
+
 int lctl_main(int argc, char **argv)
 {
         int rc;
index 701868a..02bad77 100644 (file)
@@ -127,6 +127,12 @@ static inline int lfs_mirror_verify(int argc, char **argv);
 static inline int lfs_mirror_read(int argc, char **argv);
 static inline int lfs_mirror_write(int argc, char **argv);
 static inline int lfs_mirror_copy(int argc, char **argv);
+static int lfs_pcc_attach(int argc, char **argv);
+static int lfs_pcc_detach(int argc, char **argv);
+static int lfs_pcc_detach_fid(int argc, char **argv);
+static int lfs_pcc_state(int argc, char **argv);
+static int lfs_pcc(int argc, char **argv);
+static int lfs_pcc_list_commands(int argc, char **argv);
 
 enum setstripe_origin {
        SO_SETSTRIPE,
@@ -325,6 +331,31 @@ command_t mirror_cmdlist[] = {
        { .pc_help = NULL }
 };
 
+/**
+ * command_t pcc_cmdlist - lfs pcc commands.
+ */
+command_t pcc_cmdlist[] = {
+       { .pc_name = "attach", .pc_func = lfs_pcc_attach,
+         .pc_help = "Attach given files to the Persistent Client Cache.\n"
+               "usage: lfs pcc attach <--id|-i NUM> <file> ...\n"
+               "\t-i: archive id for RW-PCC\n" },
+       { .pc_name = "state", .pc_func = lfs_pcc_state,
+         .pc_help = "Display the PCC state for given files.\n"
+               "usage: lfs pcc state <file> ...\n" },
+       { .pc_name = "detach", .pc_func = lfs_pcc_detach,
+         .pc_help = "Detach given files from the Persistent Client Cache.\n"
+               "usage: lfs pcc detach <file> ...\n" },
+       { .pc_name = "detach_fid", .pc_func = lfs_pcc_detach_fid,
+         .pc_help = "Detach given files from PCC by FID(s).\n"
+               "usage: lfs pcc detach_fid <mntpath> <fid>...\n" },
+       { .pc_name = "list-commands", .pc_func = lfs_pcc_list_commands,
+         .pc_help = "list commands supported by lfs pcc"},
+       { .pc_name = "help", .pc_func = Parser_help, .pc_help = "help" },
+       { .pc_name = "exit", .pc_func = Parser_quit, .pc_help = "quit" },
+       { .pc_name = "quit", .pc_func = Parser_quit, .pc_help = "quit" },
+       { .pc_help = NULL }
+};
+
 /* all available commands */
 command_t cmdlist[] = {
        {"setstripe", lfs_setstripe, 0,
@@ -630,6 +661,12 @@ command_t cmdlist[] = {
         "\t--clear|-c: Clear file heat for given files\n"
         "\t--off|-o:   Turn off file heat for given files\n"
         "\t--on|-O:    Turn on file heat for given files\n"},
+       {"pcc", lfs_pcc, pcc_cmdlist,
+        "lfs commands used to interact with PCC features:\n"
+        "lfs pcc attach - attach given files to Persistent Client Cache\n"
+        "lfs pcc state  - display the PCC state for given files\n"
+        "lfs pcc detach - detach given files from Persistent Client Cache\n"
+        "lfs pcc detach_fid - detach given files from PCC by FID(s)\n"},
        {"help", Parser_help, 0, "help"},
        {"exit", Parser_quit, 0, "quit"},
        {"quit", Parser_quit, 0, "quit"},
@@ -10332,6 +10369,243 @@ static int lfs_mirror_list_commands(int argc, char **argv)
        return 0;
 }
 
+static int lfs_pcc_attach(int argc, char **argv)
+{
+       struct option long_opts[] = {
+       { .val = 'i',   .name = "id",   .has_arg = required_argument },
+       { .name = NULL } };
+       int c;
+       int rc = 0;
+       __u32 archive_id = 0;
+       const char *path;
+       char *end;
+       char fullpath[PATH_MAX];
+       enum lu_pcc_type type = LU_PCC_READWRITE;
+
+       optind = 0;
+       while ((c = getopt_long(argc, argv, "i:",
+                               long_opts, NULL)) != -1) {
+               switch (c) {
+               case 'i':
+                       archive_id = strtoul(optarg, &end, 0);
+                       if (*end != '\0' || archive_id == 0) {
+                               fprintf(stderr, "error: %s: bad archive ID "
+                                       "'%s'\n", argv[0], optarg);
+                               return CMD_HELP;
+                       }
+                       break;
+               case '?':
+                       return CMD_HELP;
+               default:
+                       fprintf(stderr, "%s: option '%s' unrecognized\n",
+                               argv[0], argv[optind - 1]);
+                       return CMD_HELP;
+               }
+       }
+
+       if (argc <= optind) {
+               fprintf(stderr, "%s: must specify one or more file names\n",
+                       argv[0]);
+               return CMD_HELP;
+       }
+
+       while (optind < argc) {
+               int rc2;
+
+               path = argv[optind++];
+               if (realpath(path, fullpath) == NULL) {
+                       fprintf(stderr, "%s: could not find path '%s': %s\n",
+                               argv[0], path, strerror(errno));
+                       if (rc == 0)
+                               rc = -EINVAL;
+                       continue;
+               }
+
+               rc2 = llapi_pcc_attach(fullpath, archive_id, type);
+               if (rc2 < 0) {
+                       fprintf(stderr, "%s: cannot attach '%s' to PCC "
+                               "with archive ID '%u': %s\n", argv[0],
+                               path, archive_id, strerror(-rc2));
+                       if (rc == 0)
+                               rc = rc2;
+               }
+       }
+       return rc;
+}
+
+static int lfs_pcc_detach(int argc, char **argv)
+{
+       int                      rc = 0;
+       const char              *path;
+       char                     fullpath[PATH_MAX];
+
+       optind = 1;
+
+       if (argc <= 1) {
+               fprintf(stderr, "%s: must specify one or more file names\n",
+                       argv[0]);
+               return CMD_HELP;
+       }
+
+       while (optind < argc) {
+               int rc2;
+
+               path = argv[optind++];
+               if (realpath(path, fullpath) == NULL) {
+                       fprintf(stderr, "%s: could not find path '%s': %s\n",
+                               argv[0], path, strerror(errno));
+                       if (rc == 0)
+                               rc = -EINVAL;
+                       continue;
+               }
+
+               rc2 = llapi_pcc_detach_file(fullpath);
+               if (rc2 < 0) {
+                       fprintf(stderr, "%s: cannot detach '%s' from PCC: "
+                               "%s\n", argv[0], path, strerror(-rc2));
+                       if (rc == 0)
+                               rc = rc2;
+               }
+       }
+       return rc;
+}
+
+static int lfs_pcc_detach_fid(int argc, char **argv)
+{
+       int              rc = 0;
+       const char      *fid;
+       const char      *mntpath;
+
+       optind = 1;
+
+       if (argc <= 2) {
+               fprintf(stderr, "%s: not enough argument\n",
+                       argv[0]);
+               return CMD_HELP;
+       }
+
+       mntpath = argv[optind++];
+
+       while (optind < argc) {
+               int rc2;
+
+               fid = argv[optind++];
+
+               rc2 = llapi_pcc_detach_fid_str(mntpath, fid);
+               if (rc2 < 0) {
+                       fprintf(stderr, "%s: cannot detach '%s' on '%s' "
+                               "from PCC: %s\n", argv[0], fid, mntpath,
+                               strerror(-rc2));
+                       if (rc == 0)
+                               rc = rc2;
+               }
+       }
+       return rc;
+}
+
+static int lfs_pcc_state(int argc, char **argv)
+{
+       int                      rc = 0;
+       const char              *path;
+       char                     fullpath[PATH_MAX];
+       struct lu_pcc_state      state;
+
+       optind = 1;
+
+       if (argc <= 1) {
+               fprintf(stderr, "%s: must specify one or more file names\n",
+                       argv[0]);
+               return CMD_HELP;
+       }
+
+       while (optind < argc) {
+               int rc2;
+
+               path = argv[optind++];
+               if (realpath(path, fullpath) == NULL) {
+                       fprintf(stderr, "%s: could not find path '%s': %s\n",
+                               argv[0], path, strerror(errno));
+                       if (rc == 0)
+                               rc = -EINVAL;
+                       continue;
+               }
+
+               rc2 = llapi_pcc_state_get(fullpath, &state);
+               if (rc2 < 0) {
+                       if (rc == 0)
+                               rc = rc2;
+                       fprintf(stderr, "%s: cannot get PCC state of '%s': "
+                               "%s\n", argv[0], path, strerror(-rc2));
+                       continue;
+               }
+
+               printf("file: %s", path);
+               printf(", type: %s", pcc_type2string(state.pccs_type));
+               if (state.pccs_type == LU_PCC_NONE &&
+                   state.pccs_open_count == 0) {
+                       printf("\n");
+                       continue;
+               }
+
+               printf(", PCC file: %s", state.pccs_path);
+               printf(", user number: %u", state.pccs_open_count);
+               printf(", attr cached: %s",
+                      state.pccs_flags &  PCC_STATE_FLAG_ATTR_VALID ?
+                      "true" : "false");
+               printf("\n");
+       }
+       return rc;
+}
+
+/**
+ * lfs_pcc_list_commands() - List lfs pcc commands.
+ * @argc: The count of command line arguments.
+ * @argv: Array of strings for command line arguments.
+ *
+ * This function lists lfs pcc commands defined in pcc_cmdlist[].
+ *
+ * Return: 0 on success.
+ */
+static int lfs_pcc_list_commands(int argc, char **argv)
+{
+       char buffer[81] = "";
+
+       Parser_list_commands(pcc_cmdlist, buffer, sizeof(buffer),
+                            NULL, 0, 4);
+
+       return 0;
+}
+
+/**
+ * lfs_pcc() - Parse and execute lfs pcc commands.
+ * @argc: The count of lfs pcc command line arguments.
+ * @argv: Array of strings for lfs pcc command line arguments.
+ *
+ * This function parses lfs pcc commands and performs the
+ * corresponding functions specified in pcc_cmdlist[].
+ *
+ * Return: 0 on success or an error code on failure.
+ */
+static int lfs_pcc(int argc, char **argv)
+{
+       char cmd[PATH_MAX];
+       int rc = 0;
+
+       setlinebuf(stdout);
+
+       Parser_init("lfs-pcc > ", pcc_cmdlist);
+
+       snprintf(cmd, sizeof(cmd), "%s %s", progname, argv[0]);
+       progname = cmd;
+       program_invocation_short_name = cmd;
+       if (argc > 1)
+               rc = Parser_execarg(argc - 1, argv + 1, pcc_cmdlist);
+       else
+               rc = Parser_commands();
+
+       return rc < 0 ? -rc : rc;
+}
+
 static int lfs_list_commands(int argc, char **argv)
 {
        char buffer[81] = ""; /* 80 printable chars + terminating NUL */
index 93e72be..ce1e48e 100644 (file)
@@ -1243,6 +1243,16 @@ static int ct_restore(const struct hsm_action_item *hai, const long hal_flags)
                goto fini;
        }
 
+       /* When restore request for a file triggered by read/write/
+        * truncate operation from another client, it needs to detach
+        * the file first if it is PCC-attached.
+        */
+       rc = llapi_pcc_detach_fid_fd(opt.o_mnt_fd, &hai->hai_fid);
+       if (rc) {
+               CT_ERROR(rc, "cannot detach pcc for file '%s'", dst);
+               goto fini;
+       }
+
        dst_fd = llapi_hsm_action_get_fd(hcp);
        if (dst_fd < 0) {
                rc = dst_fd;
diff --git a/lustre/utils/liblustreapi_pcc.c b/lustre/utils/liblustreapi_pcc.c
new file mode 100644 (file)
index 0000000..8d23bf4
--- /dev/null
@@ -0,0 +1,383 @@
+/*
+ * LGPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the GNU Lesser General Public License
+ * (LGPL) version 2.1 or (at your discretion) any later version.
+ * (LGPL) version 2.1 accompanies this distribution, and is available at
+ * http://www.gnu.org/licenses/lgpl-2.1.html
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * LGPL HEADER END
+ */
+/*
+ * Copyright (c) 2017, DDN Storage Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ *
+ * lustreapi library for Persistent Client Cache.
+ *
+ * Author: Li Xi <lixi@ddn.com>
+ * Author: Qian Yingjin <qian@ddn.com>
+ */
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <lustre/lustreapi.h>
+#include <linux/lustre/lustre_user.h>
+#include <linux/lustre/lustre_fid.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/ioctl.h>
+#include "lustreapi_internal.h"
+
+/**
+ * Fetch and attach a file to readwrite PCC.
+ *
+ */
+static int llapi_readwrite_pcc_attach(const char *path, __u32 archive_id)
+{
+       int fd;
+       int rc;
+       struct ll_ioc_lease *data;
+
+       fd = open(path, O_RDWR | O_NONBLOCK);
+       if (fd < 0) {
+               rc = -errno;
+               llapi_error(LLAPI_MSG_ERROR, rc, "cannot open '%s'",
+                           path);
+               return rc;
+       }
+
+       rc = llapi_lease_acquire(fd, LL_LEASE_WRLCK);
+       if (rc < 0) {
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "cannot get lease for '%s'", path);
+               goto out_close;
+       }
+
+       data = malloc(offsetof(typeof(*data), lil_ids[1]));
+       if (!data) {
+               rc = -ENOMEM;
+               llapi_err_noerrno(LLAPI_MSG_ERROR,
+                                 "failed to allocate memory");
+               goto out_close;
+       }
+
+       data->lil_mode = LL_LEASE_UNLCK;
+       data->lil_flags = LL_LEASE_PCC_ATTACH;
+       data->lil_count = 1;
+       data->lil_ids[0] = archive_id;
+       rc = llapi_lease_set(fd, data);
+       if (rc <= 0) {
+               if (rc == 0) /* lost lease lock */
+                       rc = -EBUSY;
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "cannot attach '%s' with ID: %u",
+                            path, archive_id);
+       } else {
+               rc = 0;
+       }
+
+       free(data);
+out_close:
+       close(fd);
+       return rc;
+}
+
+int llapi_pcc_attach(const char *path, __u32 id, enum lu_pcc_type type)
+{
+       int rc;
+
+       switch (type) {
+       case LU_PCC_READWRITE:
+               rc = llapi_readwrite_pcc_attach(path, id);
+               break;
+       default:
+               rc = -EINVAL;
+               break;
+       }
+       return rc;
+}
+
+
+/**
+ * detach PCC cache of a file by an ioctl on the dir fd (usually a mount
+ * point fd that the copytool already has open).
+ *
+ * If the file is being used, the detaching will return -EBUSY immediately.
+ * Thus, if a PCC-attached file is kept open for a long time, the restore
+ * request will always return failure.
+ *
+ * \param fd           Directory file descriptor.
+ * \param fid          FID of the file.
+ *
+ * \return 0 on success, an error code otherwise.
+ */
+int llapi_pcc_detach_fid_fd(int fd, const struct lu_fid *fid)
+{
+       int rc;
+       struct lu_pcc_detach detach;
+
+       detach.pccd_fid = *fid;
+       rc = ioctl(fd, LL_IOC_PCC_DETACH, &detach);
+       if (rc == -EAGAIN)
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "FID "DFID" may be in the attaching state, "
+                           "or you may need to re-run the pcc_attach "
+                           "to finish the attach process.", PFID(fid));
+       else if (rc)
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "cannot detach FID "DFID" from PCC", PFID(fid));
+
+       return rc;
+}
+
+/**
+ * detach PCC cache of a file.
+ *
+ * \param mntpath      Fullpath to the client mount point.
+ * \param fid          FID of the file.
+ *
+ * \return 0 on success, an error code otherwise.
+ */
+int llapi_pcc_detach_fid(const char *mntpath, const struct lu_fid *fid)
+{
+       int rc;
+       int fd;
+
+       rc = get_root_path(WANT_FD, NULL, &fd, (char *)mntpath, -1);
+       if (rc) {
+               llapi_error(LLAPI_MSG_ERROR, rc, "cannot get root path: %s",
+                           mntpath);
+               return rc;
+       }
+
+       rc = llapi_pcc_detach_fid_fd(fd, fid);
+
+       close(fd);
+       return rc;
+}
+
+/**
+ * detach PCC cache of a file.
+ *
+ * \param mntpath      Fullpath to the client mount point.
+ * \param fid          FID string of the file.
+ *
+ * \return 0 on success, an error code otherwise.
+ */
+int llapi_pcc_detach_fid_str(const char *mntpath, const char *fidstr)
+{
+       int rc;
+       struct lu_fid fid;
+       const char *fidstr_orig = fidstr;
+
+       while (*fidstr == '[')
+               fidstr++;
+       rc = sscanf(fidstr, SFID, RFID(&fid));
+       if (rc != 3 || !fid_is_sane(&fid)) {
+               llapi_err_noerrno(LLAPI_MSG_ERROR,
+                                 "bad FID format '%s', should be [seq:oid:ver]"
+                                 " (e.g. "DFID")\n", fidstr_orig,
+                                 (unsigned long long)FID_SEQ_NORMAL, 2, 0);
+               return -EINVAL;
+       }
+
+       rc = llapi_pcc_detach_fid(mntpath, &fid);
+
+       return rc;
+}
+
+/**
+ * detach PCC cache of a file.
+ *
+ * \param path   Fullpath to the file to operate on.
+ *
+ * \return 0 on success, an error code otherwise.
+ */
+int llapi_pcc_detach_file(const char *path)
+{
+       int rc;
+       lustre_fid fid;
+
+       rc = llapi_path2fid(path, &fid);
+       if (rc) {
+               llapi_error(LLAPI_MSG_ERROR, rc, "cannot get FID of '%s'",
+                           path);
+               return rc;
+       }
+
+       rc = llapi_pcc_detach_fid(path, &fid);
+       return rc;
+}
+
+/**
+ * Return the current PCC state related to a file.
+ *
+ * \param fd   File handle.
+ * \param state        PCC state info.
+ *
+ * \return 0 on success, an error code otherwise.
+ */
+int llapi_pcc_state_get_fd(int fd, struct lu_pcc_state *state)
+{
+       int rc;
+
+       rc = ioctl(fd, LL_IOC_PCC_STATE, state);
+       /* If error, save errno value */
+       rc = rc ? -errno : 0;
+
+       return rc;
+}
+
+/**
+ * Return the current PCC state related to file pointed by a path.
+ *
+ * see llapi_pcc_state_get_fd() for args use and return
+ */
+int llapi_pcc_state_get(const char *path, struct lu_pcc_state *state)
+{
+       int fd;
+       int rc;
+
+       fd = open(path, O_RDONLY | O_NONBLOCK);
+       if (fd < 0)
+               return -errno;
+
+       rc = llapi_pcc_state_get_fd(fd, state);
+
+       close(fd);
+       return rc;
+}
+
+/**
+ * Add/delete a PCC backend on a client.
+ */
+int llapi_pccdev_set(const char *mntpath, const char *cmd)
+{
+       char buf[sizeof(struct obd_uuid)];
+       glob_t path;
+       ssize_t count;
+       int fd;
+       int rc;
+
+       rc = llapi_getname(mntpath, buf, sizeof(buf));
+       if (rc < 0) {
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "cannot get name for '%s'\n", mntpath);
+               return rc;
+       }
+
+       rc = cfs_get_param_paths(&path, "llite/%s/pcc", buf);
+       if (rc != 0)
+               return -errno;
+
+       fd = open(path.gl_pathv[0], O_WRONLY);
+       if (fd < 0) {
+               rc = -errno;
+               llapi_error(LLAPI_MSG_ERROR, rc, "error opening %s",
+                           path.gl_pathv[0]);
+               goto out;
+       }
+
+       count = write(fd, cmd, strlen(cmd));
+       if (count < 0) {
+               rc = errno;
+               if (errno != EIO)
+                       llapi_error(LLAPI_MSG_ERROR, rc,
+                                   "error: setting llite.%s.pcc=\"%s\"\n",
+                                   buf, cmd);
+       } else if (count < strlen(cmd)) { /* Truncate case */
+               rc = -EINVAL;
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "setting llite.%s.pcc=\"%s\": wrote only %zd\n",
+                           buf, cmd, count);
+       }
+       close(fd);
+out:
+       cfs_free_param_data(&path);
+       return rc;
+}
+
+/**
+ * List all PCC backend devices on a client.
+ */
+int llapi_pccdev_get(const char *mntpath)
+{
+       long page_size = sysconf(_SC_PAGESIZE);
+       char pathbuf[sizeof(struct obd_uuid)];
+       glob_t path;
+       char *buf;
+       int fd;
+       int rc;
+
+       rc = llapi_getname(mntpath, pathbuf, sizeof(pathbuf));
+       if (rc < 0) {
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "cannot get name for '%s'\n", mntpath);
+               return rc;
+       }
+
+       rc = cfs_get_param_paths(&path, "llite/%s/pcc", pathbuf);
+       if (rc != 0)
+               return -errno;
+
+       /* Read the contents of file to stdout */
+       fd = open(path.gl_pathv[0], O_RDONLY);
+       if (fd < 0) {
+               rc = -errno;
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "error: pccdev_get: opening '%s'\n",
+                           path.gl_pathv[0]);
+               goto out_free_param;
+       }
+
+       buf = calloc(1, page_size);
+       if (buf == NULL) {
+               rc = -ENOMEM;
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "error: pccdev_get: allocating '%s' buffer\n",
+                           path.gl_pathv[0]);
+               goto out_close;
+       }
+
+       while (1) {
+               ssize_t count = read(fd, buf, page_size);
+
+               if (count == 0)
+                       break;
+               if (count < 0) {
+                       rc = -errno;
+                       if (errno != EIO) {
+                               llapi_error(LLAPI_MSG_ERROR, rc,
+                                           "error: pccdev_get: "
+                                           "reading failed\n");
+                       }
+                       break;
+               }
+
+               if (fwrite(buf, 1, count, stdout) != count) {
+                       rc = -errno;
+                       llapi_error(LLAPI_MSG_ERROR, rc,
+                                   "error: get_param: write to stdout\n");
+                       break;
+               }
+       }
+out_close:
+       close(fd);
+       free(buf);
+out_free_param:
+       cfs_free_param_data(&path);
+       return rc;
+}
index 2f81597..15a63b5 100644 (file)
@@ -4861,3 +4861,119 @@ int jt_changelog_deregister(int argc, char **argv)
 
        return 0;
 }
+
+int jt_pcc_add(int argc, char **argv)
+{
+       struct option long_opts[] = {
+               { .val = 'p', .name = "param", .has_arg = required_argument },
+               { .name = NULL } };
+       const char *mntpath;
+       const char *pccpath;
+       char *param = NULL;
+       char cmd[PATH_MAX];
+       int rc;
+
+       optind = 1;
+       while ((rc = getopt_long(argc, argv, "p:",
+               long_opts, NULL)) != -1) {
+               switch (rc) {
+               case 'p':
+                       param = optarg;
+                       break;
+               default:
+                       return CMD_HELP;
+               }
+       }
+
+       if (!param) {
+               fprintf(stderr, "%s: must specify the config param for PCC\n",
+                       jt_cmdname(argv[0]));
+               return CMD_HELP;
+       }
+
+       if (optind + 2 != argc) {
+               fprintf(stderr,
+                       "%s: must speficy mount path and PCC path %d:%d\n",
+                       jt_cmdname(argv[0]), optind, argc);
+               return CMD_HELP;
+       }
+
+       mntpath = argv[optind++];
+       pccpath = argv[optind];
+
+       snprintf(cmd, PATH_MAX, "add %s %s", pccpath, param);
+       rc = llapi_pccdev_set(mntpath, cmd);
+       if (rc < 0)
+               fprintf(stderr, "%s: failed to run '%s' on %s\n",
+                       jt_cmdname(argv[0]), cmd, mntpath);
+
+       return rc;
+}
+
+int jt_pcc_del(int argc, char **argv)
+{
+       const char *mntpath;
+       const char *pccpath;
+       char cmd[PATH_MAX];
+       int rc;
+
+       optind = 1;
+       if (argc != 3) {
+               fprintf(stderr, "%s: require 3 arguments\n",
+                       jt_cmdname(argv[0]));
+               return CMD_HELP;
+       }
+
+       mntpath = argv[optind++];
+       pccpath = argv[optind++];
+
+       snprintf(cmd, PATH_MAX, "del %s", pccpath);
+       rc = llapi_pccdev_set(mntpath, cmd);
+       if (rc < 0)
+               fprintf(stderr, "%s: failed to run '%s' on %s\n",
+                       jt_cmdname(argv[0]), cmd, mntpath);
+
+       return rc;
+}
+
+int jt_pcc_clear(int argc, char **argv)
+{
+       const char *mntpath;
+       int rc;
+
+       optind = 1;
+       if (argc != 2) {
+               fprintf(stderr, "%s: require 2 arguments\n",
+                       jt_cmdname(argv[0]));
+               return CMD_HELP;
+       }
+
+       mntpath = argv[optind];
+       rc = llapi_pccdev_set(mntpath, "clear");
+       if (rc < 0)
+               fprintf(stderr, "%s: failed to run 'clear' on %s\n",
+                       jt_cmdname(argv[0]), mntpath);
+
+       return rc;
+}
+
+int jt_pcc_list(int argc, char **argv)
+{
+       const char *mntpath;
+       int rc;
+
+       optind = 1;
+       if (argc != 2) {
+               fprintf(stderr, "%s: require 2 arguments\n",
+                       jt_cmdname(argv[0]));
+               return CMD_HELP;
+       }
+
+       mntpath = argv[optind];
+       rc = llapi_pccdev_get(mntpath);
+       if (rc < 0)
+               fprintf(stderr, "%s: failed to run 'pcc list' on %s\n",
+                       jt_cmdname(argv[0]), mntpath);
+
+       return rc;
+}
index 8ca38a5..cf06edd 100644 (file)
@@ -194,6 +194,10 @@ int jt_nodemap_set_sepol(int argc, char **argv);
 int jt_nodemap_info(int argc, char **argv);
 int jt_changelog_register(int argc, char **argv);
 int jt_changelog_deregister(int argc, char **argv);
+int jt_pcc_add(int argc, char **argv);
+int jt_pcc_del(int argc, char **argv);
+int jt_pcc_clear(int argc, char **argv);
+int jt_pcc_list(int argc, char **argv);
 
 #ifdef HAVE_SERVER_SUPPORT
 /* lustre_lfsck.c */