Whamcloud - gitweb
LU-9538 utils: Tool for syncing file LSOM xattr 24/30124/21
authorQian Yingjin <qian@ddn.com>
Thu, 16 Nov 2017 01:42:57 +0000 (09:42 +0800)
committerOleg Drokin <green@whamcloud.com>
Thu, 9 Aug 2018 18:20:15 +0000 (18:20 +0000)
Add a helper tool for syncing file LSOM xattr.
Firstly, register a new changelog user:
lctl --device lustre-MDT0000 changelog_register

After perform some file operations on Lustre file system, run
this tool to sync file LSOM xattr:
llsom_sync -u cl1 -m lustre-MDT0000 /mnt/lustre

Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: Ia2878b48f7f665b01b230585921c78ae41846171
Reviewed-on: https://review.whamcloud.com/30124
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Li Xi <lixi@ddn.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
libcfs/include/libcfs/util/Makefile.am
libcfs/include/libcfs/util/hash.h [new file with mode: 0644]
libcfs/include/libcfs/util/parser.h
libcfs/libcfs/util/parser.c
lustre/doc/Makefile.am
lustre/doc/llsom_sync.8 [new file with mode: 0644]
lustre/tests/sanity.sh
lustre/tests/test-framework.sh
lustre/utils/Makefile.am
lustre/utils/llsom_sync.c [new file with mode: 0644]

index 7d332a8..de35727 100644 (file)
@@ -1 +1 @@
-EXTRA_DIST = ioctl.h parser.h param.h list.h string.h
+EXTRA_DIST = ioctl.h parser.h param.h list.h string.h hash.h
diff --git a/libcfs/include/libcfs/util/hash.h b/libcfs/include/libcfs/util/hash.h
new file mode 100644 (file)
index 0000000..45818dd
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+
+#ifndef _LINUX_HASH_H
+#define _LINUX_HASH_H
+/* Fast hashing routine for ints,  longs and pointers.
+   (C) 2002 Nadia Yvette Chambers, IBM */
+
+/*
+ * Knuth recommends primes in approximately golden ratio to the maximum
+ * integer representable by a machine word for multiplicative hashing.
+ * Chuck Lever verified the effectiveness of this technique:
+ * http://www.citi.umich.edu/techreports/reports/citi-tr-00-1.pdf
+ *
+ * These primes are chosen to be bit-sparse, that is operations on
+ * them can use shifts and additions instead of multiplications for
+ * machines where multiplications are slow.
+ */
+
+#include <linux/types.h>
+
+/* 2^31 + 2^29 - 2^25 + 2^22 - 2^19 - 2^16 + 1 */
+#define GOLDEN_RATIO_PRIME_32 0x9e370001UL
+/*  2^63 + 2^61 - 2^57 + 2^54 - 2^51 - 2^18 + 1 */
+#define GOLDEN_RATIO_PRIME_64 0x9e37fffffffc0001UL
+
+#if __BITS_PER_LONG == 32
+#define GOLDEN_RATIO_PRIME GOLDEN_RATIO_PRIME_32
+#define hash_long(val, bits) hash_32(val, bits)
+#elif __BITS_PER_LONG == 64
+#define hash_long(val, bits) hash_64(val, bits)
+#define GOLDEN_RATIO_PRIME GOLDEN_RATIO_PRIME_64
+#else
+#error Wordsize not 32 or 64
+#endif
+
+static __always_inline __u64 hash_64(__u64 val, unsigned int bits)
+{
+       __u64 hash = val;
+
+       /*  Sigh, gcc can't optimise this alone like it does for 32 bits. */
+       __u64 n = hash;
+       n <<= 18;
+       hash -= n;
+       n <<= 33;
+       hash -= n;
+       n <<= 3;
+       hash += n;
+       n <<= 3;
+       hash -= n;
+       n <<= 4;
+       hash += n;
+       n <<= 2;
+       hash += n;
+
+       /* High bits are more random, so use them. */
+       return hash >> (64 - bits);
+}
+
+static inline __u32 hash_32(__u32 val, unsigned int bits)
+{
+       /* On some cpus multiply is faster, on others gcc will do shifts */
+       __u32 hash = val * GOLDEN_RATIO_PRIME_32;
+
+       /* High bits are more random, so use them. */
+       return hash >> (32 - bits);
+}
+
+static inline unsigned long hash_ptr(const void *ptr, unsigned int bits)
+{
+       return hash_long((unsigned long)ptr, bits);
+}
+
+static inline __u32 hash32_ptr(const void *ptr)
+{
+       unsigned long val = (unsigned long)ptr;
+
+#if __BITS_PER_LONG == 64
+       val ^= (val >> 32);
+#endif
+       return (__u32)val;
+}
+
+#endif /* _LINUX_HASH_H */
index 1f75e0b..7bae839 100644 (file)
@@ -107,7 +107,7 @@ char *Parser_strarg(char *inp, const char *prompt, const char *deft,
 int Parser_arg2int(const char *inp, long *result, int base);
 
 /* Convert human readable size string to and int; "1k" -> 1000 */
 int Parser_arg2int(const char *inp, long *result, int base);
 
 /* Convert human readable size string to and int; "1k" -> 1000 */
-int Parser_size(int *sizep, char *str);
+int Parser_size(unsigned long *sizep, char *str);
 
 /* Convert a string boolean to an int; "enable" -> 1 */
 int Parser_bool(int *b, char *str);
 
 /* Convert a string boolean to an int; "enable" -> 1 */
 int Parser_bool(int *b, char *str);
index ec19dd1..861f97a 100644 (file)
@@ -768,40 +768,41 @@ int Parser_arg2int(const char *inp, long *result, int base)
 }
 
 /* Convert human readable size string to and int; "1k" -> 1000 */
 }
 
 /* Convert human readable size string to and int; "1k" -> 1000 */
-int Parser_size (int *sizep, char *str) {
-        int size;
-        char mod[32];
+int Parser_size(unsigned long *sizep, char *str)
+{
+       unsigned long size;
+       char mod[32];
 
 
-        switch (sscanf (str, "%d%1[gGmMkK]", &size, mod)) {
-        default:
-                return (-1);
+       switch (sscanf(str, "%lu%1[gGmMkK]", &size, mod)) {
+       default:
+               return -1;
 
 
-        case 1:
-                *sizep = size;
-                return (0);
+       case 1:
+               *sizep = size;
+               return 0;
 
 
-        case 2:
-                switch (*mod) {
-                case 'g':
-                case 'G':
-                        *sizep = size << 30;
-                        return (0);
-
-                case 'm':
-                case 'M':
-                        *sizep = size << 20;
-                        return (0);
-
-                case 'k':
-                case 'K':
-                        *sizep = size << 10;
-                        return (0);
-
-                default:
-                        *sizep = size;
-                        return (0);
-                }
-        }
+       case 2:
+               switch (*mod) {
+               case 'g':
+               case 'G':
+                       *sizep = size << 30;
+                       return 0;
+
+               case 'm':
+               case 'M':
+                       *sizep = size << 20;
+                       return 0;
+
+               case 'k':
+               case 'K':
+                       *sizep = size << 10;
+                       return 0;
+
+               default:
+                       *sizep = size;
+                       return 0;
+               }
+       }
 }
 
 /* Convert a string boolean to an int; "enable" -> 1 */
 }
 
 /* Convert a string boolean to an int; "enable" -> 1 */
index ffdba26..b446a29 100644 (file)
@@ -123,6 +123,7 @@ MANFILES =                                  \
        lustre_routes_config.8                  \
        lustre_routes_conversion.8              \
        lustre_rsync.8                          \
        lustre_routes_config.8                  \
        lustre_routes_conversion.8              \
        lustre_rsync.8                          \
+       llsom_sync.8                            \
        mount.lustre.8                          \
        nids.5                                  \
        plot-llstat.8                           \
        mount.lustre.8                          \
        nids.5                                  \
        plot-llstat.8                           \
diff --git a/lustre/doc/llsom_sync.8 b/lustre/doc/llsom_sync.8
new file mode 100644 (file)
index 0000000..083a848
--- /dev/null
@@ -0,0 +1,92 @@
+.TH llsom_sync 8 "2018 Jan 10" Lustre "Lustre Filesystem utility"
+.SH NAME
+llsom_sync \- Utility to sync file LSOM xattr.
+.SH SYNOPSIS
+.br
+.B llsom_sync --mdt|-m <mdt>  --user|-u <user id>
+.br
+.B\t\t\t [--daemonize|-d] [--verbose|-v] [--interval|-i]
+.br
+.B\t\t\t [--min-age|-a] [--max-cache|-c] [--sync|-s] <lustre_mount_point>
+.br
+
+.SH DESCRIPTION
+.B llsom_sync
+is designed to sync file LSOM xattr in the Lustre Filesystem by
+using Lustre MDT changelogs.  A changelog user must be registered
+(see lctl (8) changelog_register) before using this tool.
+
+.SH OPTIONS
+
+.B --mdt=<mdt>
+.br
+The metadata device which need to be synced the LSOM xattr of files.
+A changelog user must be registered for this device.
+
+.B --user=<user id>
+.br
+The changelog user id for the above MDT device. See lctl(8) changelog_register.
+
+.B --daemonize
+.br
+Daemonize the program. In daemon mode, the utility will scan, process the
+changelog records and sync the LSOM xattr for files periodically.
+
+.B --verbose
+.br
+Produce a verbose output.
+
+.B --interval
+.br
+The time interval to scan the Lustre changelog and process the log record in
+daemon mode.
+
+.B --min-age
+.br
+The time that llsom_sync tool will not try to sync the SOM data for any files
+closed less than this many seconds old. The default min-age value is 600s
+(10 minutes).
+
+.B --max-cache
+.br
+The total memory used for the FID cache which can be with a suffix [KkGgMm].
+The default max-cache value is 256MB. For the parameter value < 100, it is
+taken as the percentage of total memory size used for the FID cache instead
+of the cache size.
+
+.B --sync
+.br
+Sync file data to make the dirty data out of cache to ensure the blocks count
+is correct when update the file LSOM xattr. This option could hurt server
+performance significantly if thousands of fsync requests are sent.
+
+.SH EXAMPLES
+
+.TP
+Register a changelog consumer for MDT lustre-MDT0000
+$ cl_user=$(ssh root@mds01 lctl changelog_register --device lustre-MDT0000 -n)
+.br
+$ echo $cl_user
+.br
+cl1
+
+.TP
+After perform some file operations on the Lustre Filesystem with
+mount point '/mnt/lustre' and the filesystem undergoes some changes,
+sync file LSOM xattr:
+$ llsom_sync --mdt=lustre-MDT0000 --user=$cl_user \\
+.br
+             --verbose /mnt/lustre
+
+.TP
+To deregister the changelog user (e.g. after this example, or if SOM
+updates are no longer needed):
+.br
+$ ssh root@mds01 lctl changelog_deregister --device lustre-MDT0000 $cl_user
+
+.SH AUTHOR
+The llsom_sync command is part of the Lustre filesystem.
+
+.SH SEE ALSO
+.BR lustre (7),
+.BR lctl (8),
index 893740f..dda88ca 100755 (executable)
@@ -19443,6 +19443,62 @@ test_806() {
 }
 run_test 806 "Verify Lazy Size on MDS"
 
 }
 run_test 806 "Verify Lazy Size on MDS"
 
+test_807() {
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+               skip "Need MDS version at least 2.11.52" && return
+
+       # Registration step
+       changelog_register || error "changelog_register failed"
+       local cl_user="${CL_USERS[$SINGLEMDS]%% *}"
+       changelog_users $SINGLEMDS | grep -q $cl_user ||
+               error "User $cl_user not found in changelog_users"
+
+       local save="$TMP/$TESTSUITE-$TESTNAME.parameters"
+       save_lustre_params client "llite.*.xattr_cache" > $save
+       lctl set_param llite.*.xattr_cache=0
+       stack_trap "restore_lustre_params < $save" EXIT
+
+       rm -rf $DIR/$tdir || error "rm $tdir failed"
+       mkdir -p $DIR/$tdir || error "mkdir $tdir failed"
+       touch $DIR/$tdir/trunc || error "touch $tdir/trunc failed"
+       $TRUNCATE $DIR/$tdir/trunc 1024 || error "truncate $tdir/trunc failed"
+       $TRUNCATE $DIR/$tdir/trunc 1048576 ||
+               error "truncate $tdir/trunc failed"
+
+       local bs=1048576
+       dd if=/dev/zero of=$DIR/$tdir/single_dd bs=$bs count=1 ||
+               error "write $tfile failed"
+
+       # multi-client wirtes
+       local num=$(get_node_count ${CLIENTS//,/ })
+       local offset=0
+       local i=0
+
+       echo "Test SOM for muti-client ($num) writes"
+       touch $DIR/$tfile || error "touch $tfile failed"
+       $TRUNCATE $DIR/$tfile 0
+       for client in ${CLIENTS//,/ }; do
+               do_node $client $MULTIOP $DIR/$tfile Oz${offset}w${bs}c &
+               local pids[$i]=$!
+               i=$((i + 1))
+               offset=$((offset + $bs))
+       done
+       for (( i=0; i < $num; i++ )); do
+               wait ${pids[$i]}
+       done
+
+       sleep 5
+       $LSOM_SYNC -u $cl_user -m $FSNAME-MDT0000 $MOUNT
+       check_lsom_data $DIR/$tdir/trunc
+       check_lsom_data $DIR/$tdir/single_dd
+       check_lsom_data $DIR/$tfile
+
+       rm -rf $DIR/$tdir
+       # Deregistration step
+       changelog_deregister || error "changelog_deregister failed"
+}
+run_test 807 "verify LSOM syncing tool"
+
 #
 # tests that do cleanup/setup should be run at the end
 #
 #
 # tests that do cleanup/setup should be run at the end
 #
index 98b107e..935708b 100755 (executable)
@@ -304,6 +304,10 @@ init_test_env() {
        [ ! -f "$LR_READER" ] &&
                export LR_READER=$(which lr_reader 2> /dev/null)
        [ -z "$LR_READER" ] && export LR_READER="/usr/sbin/lr_reader"
        [ ! -f "$LR_READER" ] &&
                export LR_READER=$(which lr_reader 2> /dev/null)
        [ -z "$LR_READER" ] && export LR_READER="/usr/sbin/lr_reader"
+       export LSOM_SYNC=${LSOM_SYNC:-"$LUSTRE/utils/llsom_sync"}
+       [ ! -f "$LSOM_SYNC" ] &&
+               export LSOM_SYNC=$(which llsom_sync 2> /dev/null)
+       [ -z "$LSOM_SYNC" ] && export LSOM_SYNC="/usr/sbin/llsom_sync"
        export NAME=${NAME:-local}
        export LGSSD=${LGSSD:-"$LUSTRE/utils/gss/lgssd"}
        [ "$GSS_PIPEFS" = "true" ] && [ ! -f "$LGSSD" ] &&
        export NAME=${NAME:-local}
        export LGSSD=${LGSSD:-"$LUSTRE/utils/gss/lgssd"}
        [ "$GSS_PIPEFS" = "true" ] && [ ! -f "$LGSSD" ] &&
index 82d1c40..9d68b2f 100644 (file)
@@ -27,7 +27,8 @@ rootsbin_PROGRAMS = mount.lustre
 bin_SCRIPTS   = llstat llobdstat plot-llstat
 bin_PROGRAMS  = lfs
 sbin_SCRIPTS  = ldlm_debug_upcall
 bin_SCRIPTS   = llstat llobdstat plot-llstat
 bin_PROGRAMS  = lfs
 sbin_SCRIPTS  = ldlm_debug_upcall
-sbin_PROGRAMS = lctl l_getidentity llverfs lustre_rsync ll_decode_linkea
+sbin_PROGRAMS = lctl l_getidentity llverfs lustre_rsync ll_decode_linkea \
+               llsom_sync
 
 if TESTS
 sbin_PROGRAMS += wiretest
 
 if TESTS
 sbin_PROGRAMS += wiretest
@@ -68,6 +69,9 @@ lustre_rsync_SOURCES = lustre_rsync.c lustre_rsync.h callvpe.c callvpe.h
 lustre_rsync_LDADD :=  liblustreapi.la $(PTHREAD_LIBS)
 lustre_rsync_DEPENDENCIES := liblustreapi.la
 
 lustre_rsync_LDADD :=  liblustreapi.la $(PTHREAD_LIBS)
 lustre_rsync_DEPENDENCIES := liblustreapi.la
 
+llsom_sync_LDADD := liblustreapi.la
+llsom_sync_DEPENDENCIES := liblustreapi.la
+
 lshowmount_SOURCES = lshowmount.c nidlist.c nidlist.h
 lshowmount_LDADD :=  liblustreapi.la
 
 lshowmount_SOURCES = lshowmount.c nidlist.c nidlist.h
 lshowmount_LDADD :=  liblustreapi.la
 
diff --git a/lustre/utils/llsom_sync.c b/lustre/utils/llsom_sync.c
new file mode 100644 (file)
index 0000000..9676ded
--- /dev/null
@@ -0,0 +1,654 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2017, DDN Storage Corporation.
+ */
+/*
+ * lustre/utils/llsom_sync.c
+ *
+ * Tool for sync the LSOM xattr.
+ *
+ * Author: Qian Yingjin <qian@ddn.com>
+ */
+
+#include <stdlib.h>
+#include <errno.h>
+#include <getopt.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <assert.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <time.h>
+#include <linux/unistd.h>
+#include <linux/kernel.h>
+#include <sys/sysinfo.h>
+#include <linux/lustre/lustre_user.h>
+#include <lustre/lustreapi.h>
+#include <linux/lustre/lustre_idl.h>
+#include <linux/lustre/lustre_fid.h>
+#include <libcfs/util/hash.h>
+#include <libcfs/util/list.h>
+#include <libcfs/util/parser.h>
+
+#ifndef PATH_MAX
+#define PATH_MAX (4096)
+#endif
+
+#define container_of(ptr, type, member) ({                      \
+       const typeof(((type *) 0)->member) * __mptr = (ptr);     \
+       (type *) ((char *) __mptr - offsetof(type, member)); })
+
+#define CHLG_POLL_INTV 60
+#define REC_MIN_AGE    600
+#define DEF_CACHE_SIZE (256 * 1048576) /* 256MB */
+
+struct options {
+       const char      *o_chlg_user;
+       const char      *o_mdtname;
+       const char      *o_mntpt;
+       bool             o_daemonize;
+       bool             o_data_sync;
+       int              o_verbose;
+       int              o_intv;
+       int              o_min_age;
+       unsigned long    o_cached_fid_hiwm; /* high watermark */
+       unsigned long    o_batch_sync_cnt;
+};
+
+struct options opt;
+
+struct fid_rec {
+       struct hlist_node       fr_node;
+       struct list_head        fr_link;
+       lustre_fid              fr_fid;
+       __u64                   fr_time;
+       __u64                   fr_index;
+};
+
+static const int fid_hash_shift = 6;
+
+#define FID_HASH_ENTRIES       (1 << fid_hash_shift)
+#define FID_ON_HASH(f)         (!hlist_unhashed(&(f)->fr_node))
+
+#if __BITS_PER_LONG == 32
+#define FID_HASH_FN(f) (hash_long(fid_flatten32(f), fid_hash_shift))
+#elif __BITS_PER_LONG == 64
+#define FID_HASH_FN(f) (hash_long(fid_flatten(f), fid_hash_shift))
+#else
+#error Wordsize not 32 or 64
+#endif
+
+struct lsom_head {
+       struct hlist_head       *lh_hash;
+       struct list_head         lh_list; /* ordered list by record index */
+       unsigned long            lh_cached_count;
+} head;
+
+static void usage(char *prog)
+{
+       printf("\nUsage: %s [options] -u <userid> -m <mdtdev> <mntpt>\n"
+              "options:\n"
+              "\t-d, --daemonize\n"
+              "\t-i, --interval, poll interval in second\n"
+              "\t-a, --min-age, min age before a record is processed.\n"
+              "\t-c, --max-cache, percentage of the memroy used for cache.\n"
+              "\t-s, --sync, data sync when update LSOM xattr\n"
+              "\t-v, --verbose, produce more verbose ouput\n",
+              prog);
+       exit(0);
+}
+
+static inline __u64 fid_flatten(const struct lu_fid *fid)
+{
+       __u64 ino;
+       __u64 seq;
+
+       if (fid_is_igif(fid)) {
+               ino = lu_igif_ino(fid);
+               return ino;
+       }
+
+       seq = fid_seq(fid);
+
+       ino = (seq << 24) + ((seq >> 24) & 0xffffff0000ULL) + fid_oid(fid);
+
+       return ino ?: fid_oid(fid);
+}
+
+/**
+ * map fid to 32 bit value for ino on 32bit systems.
+ */
+static inline __u32 fid_flatten32(const struct lu_fid *fid)
+{
+       __u32 ino;
+       __u64 seq;
+
+       if (fid_is_igif(fid)) {
+               ino = lu_igif_ino(fid);
+               return ino;
+       }
+
+       seq = fid_seq(fid) - FID_SEQ_START;
+
+       /* Map the high bits of the OID into higher bits of the inode number so
+        * that inodes generated at about the same time have a reduced chance
+        * of collisions. This will give a period of 2^12 = 1024 unique clients
+        * (from SEQ) and up to min(LUSTRE_SEQ_MAX_WIDTH, 2^20) = 128k objects
+        * (from OID), or up to 128M inodes without collisions for new files.
+        */
+       ino = ((seq & 0x000fffffULL) << 12) + ((seq >> 8) & 0xfffff000) +
+             (seq >> (64 - (40-8)) & 0xffffff00) +
+             (fid_oid(fid) & 0xff000fff) + ((fid_oid(fid) & 0x00fff000) << 8);
+
+       return ino ?: fid_oid(fid);
+}
+
+static inline bool fid_eq(const lustre_fid *f1, const lustre_fid *f2)
+{
+       return f1->f_seq == f2->f_seq && f1->f_oid == f2->f_oid &&
+              f1->f_ver == f2->f_ver;
+}
+
+static void fid_hash_del(struct fid_rec *f)
+{
+       if (FID_ON_HASH(f))
+               hlist_del_init(&f->fr_node);
+}
+
+static void fid_hash_add(struct fid_rec *f)
+{
+       assert(!FID_ON_HASH(f));
+       hlist_add_head(&f->fr_node, &head.lh_hash[FID_HASH_FN(&f->fr_fid)]);
+}
+
+static struct fid_rec *fid_hash_find(const lustre_fid *fid)
+{
+       struct hlist_head *hash_list;
+       struct hlist_node *entry, *next;
+       struct fid_rec *f;
+
+       hash_list = &head.lh_hash[FID_HASH_FN(fid)];
+       hlist_for_each_entry_safe(f, entry, next, hash_list, fr_node) {
+               assert(FID_ON_HASH(f));
+               if (fid_eq(fid, &f->fr_fid))
+                       return f;
+       }
+
+       return NULL;
+}
+
+static int lsom_setup(void)
+{
+       int i;
+
+       /* set llapi message level */
+       llapi_msg_set_level(opt.o_verbose);
+
+       memset(&head, 0, sizeof(head));
+       head.lh_hash = malloc(sizeof(struct hlist_head) * FID_HASH_ENTRIES);
+       if (head.lh_hash == NULL) {
+               llapi_err_noerrno(LLAPI_MSG_ERROR,
+                                "failed to alloc memory for hash (%zu).",
+                                sizeof(struct hlist_head) * FID_HASH_ENTRIES);
+               return -ENOMEM;
+       }
+
+       for (i = 0; i < FID_HASH_ENTRIES; i++)
+               INIT_HLIST_HEAD(&head.lh_hash[i]);
+
+       INIT_LIST_HEAD(&head.lh_list);
+       return 0;
+}
+
+static void lsom_cleanup(void)
+{
+       free(head.lh_hash);
+}
+
+static int lsom_update_one(struct fid_rec *f)
+{
+       struct stat st;
+       int fd;
+       int rc = 0;
+
+       fd = llapi_open_by_fid(opt.o_mntpt, &f->fr_fid,
+                              O_RDONLY | O_NOATIME);
+       if (fd < 0) {
+               rc = -errno;
+
+               /* The file may be deleted, clean the corresponding
+                * changelog record and ignore this error.
+                */
+               if (rc == -ENOENT)
+                       goto clean_up;
+
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "llapi_open_by_fid for " DFID " failed",
+                           PFID(&f->fr_fid));
+               return rc;
+       }
+
+       if (opt.o_data_sync) {
+               __u64 dv;
+
+               /* Flush dirty pages from clients */
+               rc = llapi_get_data_version(fd, &dv, LL_DV_RD_FLUSH);
+               if (rc < 0)
+                       llapi_error(LLAPI_MSG_ERROR, errno,
+                                   "failed to sync data for " DFID,
+                                   PFID(&f->fr_fid));
+               /* ignore this error, continue to sync lsom data */
+       }
+
+       rc = fstat(fd, &st);
+       if (rc < 0) {
+               llapi_error(LLAPI_MSG_ERROR, rc, "failed to stat FID: " DFID,
+                           PFID(&f->fr_fid));
+               return rc;
+       }
+
+       /* After call fstat(), it already gets OST attrs to the client,
+        * when close the file, MDS will update the LSOM data itself
+        * according the size and blocks information from the client.
+        */
+       close(fd);
+
+       llapi_printf(LLAPI_MSG_DEBUG,
+                    "record %llu:%llu, updated LSOM for fid " DFID
+                    " size:%lu blocks:%lu\n", f->fr_time, f->fr_index,
+                    PFID(&f->fr_fid), st.st_size, st.st_blocks);
+
+clean_up:
+       rc = llapi_changelog_clear(opt.o_mdtname,
+                                  opt.o_chlg_user, f->fr_index);
+       if (rc)
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "failed to clear changelog record: %s:%llu",
+                           opt.o_chlg_user, f->fr_index);
+       return rc;
+}
+
+static int lsom_start_update(int count)
+{
+       int rc = 0;
+       int i = 0;
+
+       llapi_printf(LLAPI_MSG_INFO, "Start to sync %d records.\n", count);
+
+       while (i < count) {
+               struct fid_rec *f;
+
+               f = list_entry(head.lh_list.next, struct fid_rec, fr_link);
+               rc = lsom_update_one(f);
+               if (rc == 0) {
+                       list_del_init(&f->fr_link);
+                       fid_hash_del(f);
+                       free(f);
+                       head.lh_cached_count--;
+                       i++;
+               } else {
+                       goto out;
+               }
+       }
+
+out:
+       return rc;
+}
+
+static int lsom_check_sync(void)
+{
+       int rc = 0;
+       int count;
+
+repeated:
+       count = 0;
+       if (list_empty(&head.lh_list))
+               return 0;
+
+       if (head.lh_cached_count > opt.o_cached_fid_hiwm)
+               count = opt.o_batch_sync_cnt;
+       else {
+               struct fid_rec *f;
+               time_t now;
+
+               /* When the first record in the list was not being
+                * processed for a long time (more than o_min_age),
+                * pop the record, start to handle it immediately.
+                */
+               now = time(NULL);
+               f = list_entry(head.lh_list.next, struct fid_rec, fr_link);
+               if (now > ((f->fr_time >> 30) + opt.o_min_age))
+                       count = 1;
+       }
+
+       if (count > 0)
+               rc = lsom_start_update(count);
+
+       if (rc == 0 && count == 1)
+               goto repeated;
+
+       return rc;
+}
+
+static void lsom_sort_record_list(struct fid_rec *f)
+{
+       struct list_head *pos;
+       bool need_move = false;
+
+       for (pos = f->fr_link.next; pos != &head.lh_list; pos = pos->next) {
+               struct fid_rec *rec = list_entry(pos, struct fid_rec, fr_link);
+
+               if (f->fr_index > rec->fr_index) {
+                       need_move = true;
+                       continue;
+               } else {
+                       break;
+               }
+       }
+
+       if (need_move)
+               list_move_tail(&f->fr_link, pos);
+}
+
+static int process_record(struct changelog_rec *rec)
+{
+       __u64 index = rec->cr_index;
+       int rc = 0;
+
+       if (rec->cr_type == CL_CLOSE || rec->cr_type == CL_TRUNC ||
+           rec->cr_type == CL_SETATTR) {
+               struct fid_rec *f;
+
+               f = fid_hash_find(&rec->cr_tfid);
+               if (f == NULL) {
+                       f = malloc(sizeof(struct fid_rec));
+                       if (f == NULL) {
+                               rc = -ENOMEM;
+                               llapi_error(LLAPI_MSG_ERROR, rc,
+                                           "failed to alloc memory for fid_rec");
+                               return rc;
+                       }
+
+                       f->fr_fid = rec->cr_tfid;
+                       f->fr_index = index;
+                       f->fr_time = rec->cr_time;
+                       INIT_HLIST_NODE(&f->fr_node);
+                       fid_hash_add(f);
+                       /*
+                        * The newly changelog record index is processed in the
+                        * ascending order, so it is safe to put the record at
+                        * the tail of the ordered list.
+                        */
+                       list_add_tail(&f->fr_link, &head.lh_list);
+                       head.lh_cached_count++;
+               } else {
+                       f->fr_index = index;
+                       lsom_sort_record_list(f);
+               }
+       }
+
+       llapi_printf(LLAPI_MSG_DEBUG, "Processed changelog record index:%llu "
+                    "type:%s(0x%x) FID:"DFID"\n", index,
+                    changelog_type2str(__le32_to_cpu(rec->cr_type)),
+                    __le32_to_cpu(rec->cr_type), PFID(&rec->cr_tfid));
+
+       return rc;
+}
+
+static unsigned long get_fid_cache_size(int pct)
+{
+       struct sysinfo sinfo;
+       unsigned long cache_size;
+       int rc;
+
+       rc = sysinfo(&sinfo);
+       if (rc) {
+               llapi_error(LLAPI_MSG_ERROR, rc, "failed to get sysinfo");
+               /* ignore this error, just pick some reasonable static
+                * limit for the cache size (e.g. 256MB, default value).
+                */
+               cache_size = DEF_CACHE_SIZE;
+       } else {
+               /* maximum cached fid size is tunned according to total
+                * memory size, e.g. 5% of the memroy.
+                */
+               cache_size = sinfo.totalram * pct / 100;
+       }
+
+       return cache_size;
+}
+
+int main(int argc, char **argv)
+{
+       int                      c;
+       int                      rc;
+       void                    *chglog_hdlr;
+       struct changelog_rec    *rec;
+       bool                     stop = 0;
+       int                      ret = 0;
+       unsigned long            cache_size = DEF_CACHE_SIZE;
+       char                     fsname[MAX_OBD_NAME + 1];
+       static struct option options[] = {
+               { "mdt", required_argument, NULL, 'm' },
+               { "user", required_argument, 0, 'u'},
+               { "daemonize", no_argument, NULL, 'd'},
+               { "interval", required_argument, NULL, 'i'},
+               { "min-age", required_argument, NULL, 'a'},
+               { "max-cache", required_argument, NULL, 'c'},
+               { "verbose", no_argument, NULL, 'v'},
+               { "sync", no_argument, NULL, 's'},
+               { "help", no_argument, NULL, 'h' },
+               { NULL }
+       };
+
+       memset(&opt, 0, sizeof(opt));
+       opt.o_data_sync = false;
+       opt.o_verbose = LLAPI_MSG_INFO;
+       opt.o_intv = CHLG_POLL_INTV;
+       opt.o_min_age = REC_MIN_AGE;
+
+       while ((c = getopt_long(argc, argv, "u:hm:dsi:a:c:v", options, NULL))
+              != EOF) {
+               switch (c) {
+               default:
+                       rc = -EINVAL;
+                       llapi_error(LLAPI_MSG_ERROR, rc,
+                                   "%s: unknown option '-%c'\n",
+                                   argv[0], optopt);
+                       return rc;
+               case 'u':
+                       opt.o_chlg_user = optarg;
+                       break;
+               case 'h':
+                       usage(argv[0]);
+                       break;
+               case 'm':
+                       opt.o_mdtname = optarg;
+                       break;
+               case 'd':
+                       opt.o_daemonize = true;
+                       break;
+               case 'i':
+                       opt.o_intv = atoi(optarg);
+                       if (opt.o_intv < 0) {
+                               rc = -EINVAL;
+                               llapi_error(LLAPI_MSG_ERROR, rc,
+                                           "bad value for -i %s", optarg);
+                               return rc;
+                       }
+                       break;
+               case 'a':
+                       opt.o_min_age = atoi(optarg);
+                       if (opt.o_min_age < 0) {
+                               rc = -EINVAL;
+                               llapi_error(LLAPI_MSG_ERROR, rc,
+                                           "bad value for -a %s", optarg);
+                               return rc;
+                       }
+                       break;
+               case 'c':
+                       rc = Parser_size(&cache_size, optarg);
+                       if (rc < 0) {
+                               rc = -EINVAL;
+                               llapi_error(LLAPI_MSG_ERROR, rc,
+                                           "bad valud for -c '%s'", optarg);
+                               return rc;
+                       }
+
+                       /* For value < 100, it is taken as the percentage of
+                        * total memory instead.
+                        */
+                       if (cache_size < 100)
+                               cache_size = get_fid_cache_size(cache_size);
+                       llapi_printf(LLAPI_MSG_INFO, "Cache size: %lu\n",
+                                    cache_size);
+                       break;
+               case 'v':
+                       opt.o_verbose++;
+                       break;
+               case 's':
+                       opt.o_data_sync = true;
+                       break;
+               }
+       }
+
+       if (argc != optind + 1) {
+               llapi_err_noerrno(LLAPI_MSG_ERROR,
+                                 "%s: no mount point specified\n", argv[0]);
+               usage(argv[0]);
+       }
+
+       opt.o_mntpt = argv[optind];
+       rc = llapi_search_fsname(opt.o_mntpt, fsname);
+       if (rc < 0) {
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "cannot find a Lustre file system mounted at '%s'",
+                           opt.o_mntpt);
+               return rc;
+       }
+
+       if (!opt.o_mdtname)
+               usage(argv[0]);
+
+       if (!opt.o_chlg_user)
+               usage(argv[0]);
+
+       if (opt.o_daemonize) {
+               rc = daemon(1, 1);
+               if (rc < 0) {
+                       rc = -errno;
+                       llapi_error(LLAPI_MSG_ERROR, rc, "cannot daemonize");
+                       return rc;
+               }
+
+               setbuf(stdout, NULL);
+       }
+
+       opt.o_cached_fid_hiwm = cache_size / sizeof(struct fid_rec);
+       opt.o_batch_sync_cnt = opt.o_cached_fid_hiwm / 2;
+
+       rc = lsom_setup();
+       if (rc < 0)
+               return rc;
+
+       while (!stop) {
+               bool eof = false;
+
+               llapi_printf(LLAPI_MSG_DEBUG, "Start receiving records\n");
+               rc = llapi_changelog_start(&chglog_hdlr,
+                                          CHANGELOG_FLAG_BLOCK |
+                                          CHANGELOG_FLAG_JOBID |
+                                          CHANGELOG_FLAG_EXTRA_FLAGS,
+                                          opt.o_mdtname, 0);
+               if (rc) {
+                       llapi_error(LLAPI_MSG_ERROR, rc,
+                                   "unable to open changelog of MDT [%s]\n",
+                                   opt.o_mdtname);
+                       return rc;
+               }
+
+               while (!eof && !stop) {
+                       rc = llapi_changelog_recv(chglog_hdlr, &rec);
+                       switch (rc) {
+                       case 0:
+                               rc = process_record(rec);
+                               if (rc) {
+                                       llapi_error(LLAPI_MSG_ERROR, rc,
+                                                   "failed to process record");
+                                       ret = rc;
+                               }
+
+                               llapi_changelog_free(&rec);
+
+                               rc = lsom_check_sync();
+                               if (rc) {
+                                       stop = true;
+                                       ret = rc;
+                               }
+
+                               break;
+                       case 1: /* EOF */
+                               llapi_printf(LLAPI_MSG_DEBUG,
+                                            "finished reading [%s]\n",
+                                            opt.o_mdtname);
+                               eof = true;
+                               break;
+                       case -EINVAL: /* FS unmounted */
+                       case -EPROTO:  /* error in KUC channel */
+                       default:
+                               stop = true;
+                               llapi_error(LLAPI_MSG_ERROR, rc,
+                                           "failed to get changelog record");
+                               ret = rc;
+                               break;
+                       }
+               }
+
+               /* reach EOF of changelog */
+               rc = llapi_changelog_fini(&chglog_hdlr);
+               if (rc) {
+                       llapi_error(LLAPI_MSG_ERROR, rc,
+                                   "unable to close changelog of MDT [%s]",
+                                   opt.o_mdtname);
+                       ret = rc;
+                       return rc;
+               }
+
+               if (opt.o_daemonize) {
+                       sleep(opt.o_intv);
+
+                       rc = lsom_check_sync();
+                       if (rc) {
+                               stop = true;
+                               ret = rc;
+                       }
+               } else {
+                       lsom_start_update(head.lh_cached_count);
+                       stop = true;
+               }
+       }
+
+       lsom_cleanup();
+       return ret;
+}