/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 only, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License version 2 for more details (a copy is included * in the LICENSE file that accompanied this code). * * You should have received a copy of the GNU General Public License * version 2 along with this program; if not, see * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ /* * Copyright (c) 2017, DDN Storage Corporation. */ /* * lustre/utils/llsom_sync.c * * Tool for sync the LSOM xattr. * * Author: Qian Yingjin */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define container_of(ptr, type, member) ({ \ const typeof(((type *) 0)->member) * __mptr = (ptr); \ (type *) ((char *) __mptr - offsetof(type, member)); }) #define CHLG_POLL_INTV 60 #define REC_MIN_AGE 600 #define DEF_CACHE_SIZE (256 * 1048576) /* 256MB */ struct options { const char *o_chlg_user; const char *o_mdtname; const char *o_mntpt; bool o_daemonize; bool o_data_sync; int o_verbose; int o_intv; int o_min_age; unsigned long o_cached_fid_hiwm; /* high watermark */ unsigned long o_batch_sync_cnt; }; struct options opt; struct fid_rec { struct hlist_node fr_node; struct list_head fr_link; lustre_fid fr_fid; __u64 fr_time; __u64 fr_index; }; static const int fid_hash_shift = 6; #define FID_HASH_ENTRIES (1 << fid_hash_shift) #define FID_ON_HASH(f) (!hlist_unhashed(&(f)->fr_node)) #if __BITS_PER_LONG == 32 #define FID_HASH_FN(f) (hash_long(fid_flatten32(f), fid_hash_shift)) #elif __BITS_PER_LONG == 64 #define FID_HASH_FN(f) (hash_long(fid_flatten(f), fid_hash_shift)) #else #error Wordsize not 32 or 64 #endif struct lsom_head { struct hlist_head *lh_hash; struct list_head lh_list; /* ordered list by record index */ unsigned long lh_cached_count; } head; static void usage(char *prog) { printf("\nUsage: %s [options] -u -m \n" "options:\n" "\t-d, --daemonize\n" "\t-i, --interval, poll interval in second\n" "\t-a, --min-age, min age before a record is processed.\n" "\t-c, --max-cache, percentage of the memroy used for cache.\n" "\t-s, --sync, data sync when update LSOM xattr\n" "\t-v, --verbose, produce more verbose ouput\n", prog); exit(0); } static inline __u64 fid_flatten(const struct lu_fid *fid) { __u64 ino; __u64 seq; if (fid_is_igif(fid)) { ino = lu_igif_ino(fid); return ino; } seq = fid_seq(fid); ino = (seq << 24) + ((seq >> 24) & 0xffffff0000ULL) + fid_oid(fid); return ino ?: fid_oid(fid); } /** * map fid to 32 bit value for ino on 32bit systems. */ static inline __u32 fid_flatten32(const struct lu_fid *fid) { __u32 ino; __u64 seq; if (fid_is_igif(fid)) { ino = lu_igif_ino(fid); return ino; } seq = fid_seq(fid) - FID_SEQ_START; /* Map the high bits of the OID into higher bits of the inode number so * that inodes generated at about the same time have a reduced chance * of collisions. This will give a period of 2^12 = 1024 unique clients * (from SEQ) and up to min(LUSTRE_SEQ_MAX_WIDTH, 2^20) = 128k objects * (from OID), or up to 128M inodes without collisions for new files. */ ino = ((seq & 0x000fffffULL) << 12) + ((seq >> 8) & 0xfffff000) + (seq >> (64 - (40-8)) & 0xffffff00) + (fid_oid(fid) & 0xff000fff) + ((fid_oid(fid) & 0x00fff000) << 8); return ino ?: fid_oid(fid); } static inline bool fid_eq(const lustre_fid *f1, const lustre_fid *f2) { return f1->f_seq == f2->f_seq && f1->f_oid == f2->f_oid && f1->f_ver == f2->f_ver; } static void fid_hash_del(struct fid_rec *f) { if (FID_ON_HASH(f)) hlist_del_init(&f->fr_node); } static void fid_hash_add(struct fid_rec *f) { assert(!FID_ON_HASH(f)); hlist_add_head(&f->fr_node, &head.lh_hash[FID_HASH_FN(&f->fr_fid)]); } static struct fid_rec *fid_hash_find(const lustre_fid *fid) { struct hlist_head *hash_list; struct hlist_node *entry, *next; struct fid_rec *f; hash_list = &head.lh_hash[FID_HASH_FN(fid)]; hlist_for_each_entry_safe(f, entry, next, hash_list, fr_node) { assert(FID_ON_HASH(f)); if (fid_eq(fid, &f->fr_fid)) return f; } return NULL; } static int lsom_setup(void) { int i; /* set llapi message level */ llapi_msg_set_level(opt.o_verbose); memset(&head, 0, sizeof(head)); head.lh_hash = malloc(sizeof(struct hlist_head) * FID_HASH_ENTRIES); if (head.lh_hash == NULL) { llapi_err_noerrno(LLAPI_MSG_ERROR, "failed to alloc memory for hash (%zu).", sizeof(struct hlist_head) * FID_HASH_ENTRIES); return -ENOMEM; } for (i = 0; i < FID_HASH_ENTRIES; i++) INIT_HLIST_HEAD(&head.lh_hash[i]); INIT_LIST_HEAD(&head.lh_list); return 0; } static void lsom_cleanup(void) { free(head.lh_hash); } static int lsom_update_one(struct fid_rec *f) { struct stat st; int fd; int rc = 0; fd = llapi_open_by_fid(opt.o_mntpt, &f->fr_fid, O_RDONLY | O_NOATIME); if (fd < 0) { rc = -errno; /* The file may be deleted, clean the corresponding * changelog record and ignore this error. */ if (rc == -ENOENT) goto clean_up; llapi_error(LLAPI_MSG_ERROR, rc, "llapi_open_by_fid for " DFID " failed", PFID(&f->fr_fid)); return rc; } if (opt.o_data_sync) { __u64 dv; /* Flush dirty pages from clients */ rc = llapi_get_data_version(fd, &dv, LL_DV_RD_FLUSH); if (rc < 0) llapi_error(LLAPI_MSG_ERROR, errno, "failed to sync data for " DFID, PFID(&f->fr_fid)); /* ignore this error, continue to sync lsom data */ } rc = fstat(fd, &st); if (rc < 0) { llapi_error(LLAPI_MSG_ERROR, rc, "failed to stat FID: " DFID, PFID(&f->fr_fid)); return rc; } /* After call fstat(), it already gets OST attrs to the client, * when close the file, MDS will update the LSOM data itself * according the size and blocks information from the client. */ close(fd); llapi_printf(LLAPI_MSG_DEBUG, "record %llu:%llu, updated LSOM for fid " DFID " size:%lu blocks:%lu\n", f->fr_time, f->fr_index, PFID(&f->fr_fid), st.st_size, st.st_blocks); clean_up: rc = llapi_changelog_clear(opt.o_mdtname, opt.o_chlg_user, f->fr_index); if (rc) llapi_error(LLAPI_MSG_ERROR, rc, "failed to clear changelog record: %s:%llu", opt.o_chlg_user, f->fr_index); return rc; } static int lsom_start_update(int count) { int rc = 0; int i = 0; llapi_printf(LLAPI_MSG_INFO, "Start to sync %d records.\n", count); while (i < count) { struct fid_rec *f; f = list_entry(head.lh_list.next, struct fid_rec, fr_link); rc = lsom_update_one(f); if (rc == 0) { list_del_init(&f->fr_link); fid_hash_del(f); free(f); head.lh_cached_count--; i++; } else { goto out; } } out: return rc; } static int lsom_check_sync(void) { int rc = 0; int count; repeated: count = 0; if (list_empty(&head.lh_list)) return 0; if (head.lh_cached_count > opt.o_cached_fid_hiwm) count = opt.o_batch_sync_cnt; else { struct fid_rec *f; time_t now; /* When the first record in the list was not being * processed for a long time (more than o_min_age), * pop the record, start to handle it immediately. */ now = time(NULL); f = list_entry(head.lh_list.next, struct fid_rec, fr_link); if (now > ((f->fr_time >> 30) + opt.o_min_age)) count = 1; } if (count > 0) rc = lsom_start_update(count); if (rc == 0 && count == 1) goto repeated; return rc; } static void lsom_sort_record_list(struct fid_rec *f) { struct list_head *pos; bool need_move = false; for (pos = f->fr_link.next; pos != &head.lh_list; pos = pos->next) { struct fid_rec *rec = list_entry(pos, struct fid_rec, fr_link); if (f->fr_index > rec->fr_index) { need_move = true; continue; } else { break; } } if (need_move) list_move_tail(&f->fr_link, pos); } static int process_record(struct changelog_rec *rec) { __u64 index = rec->cr_index; int rc = 0; if (rec->cr_type == CL_CLOSE || rec->cr_type == CL_TRUNC || rec->cr_type == CL_SETATTR) { struct fid_rec *f; f = fid_hash_find(&rec->cr_tfid); if (f == NULL) { f = malloc(sizeof(struct fid_rec)); if (f == NULL) { rc = -ENOMEM; llapi_error(LLAPI_MSG_ERROR, rc, "failed to alloc memory for fid_rec"); return rc; } f->fr_fid = rec->cr_tfid; f->fr_index = index; f->fr_time = rec->cr_time; INIT_HLIST_NODE(&f->fr_node); fid_hash_add(f); /* * The newly changelog record index is processed in the * ascending order, so it is safe to put the record at * the tail of the ordered list. */ list_add_tail(&f->fr_link, &head.lh_list); head.lh_cached_count++; } else { f->fr_index = index; lsom_sort_record_list(f); } } llapi_printf(LLAPI_MSG_DEBUG, "Processed changelog record index:%llu " "type:%s(0x%x) FID:"DFID"\n", index, changelog_type2str(__le32_to_cpu(rec->cr_type)), __le32_to_cpu(rec->cr_type), PFID(&rec->cr_tfid)); return rc; } static unsigned long get_fid_cache_size(int pct) { struct sysinfo sinfo; unsigned long cache_size; int rc; rc = sysinfo(&sinfo); if (rc) { llapi_error(LLAPI_MSG_ERROR, rc, "failed to get sysinfo"); /* ignore this error, just pick some reasonable static * limit for the cache size (e.g. 256MB, default value). */ cache_size = DEF_CACHE_SIZE; } else { /* maximum cached fid size is tunned according to total * memory size, e.g. 5% of the memroy. */ cache_size = sinfo.totalram * pct / 100; } return cache_size; } int main(int argc, char **argv) { int c; int rc; void *chglog_hdlr; struct changelog_rec *rec; bool stop = 0; int ret = 0; unsigned long cache_size = DEF_CACHE_SIZE; char fsname[MAX_OBD_NAME + 1]; static struct option options[] = { { "mdt", required_argument, NULL, 'm' }, { "user", required_argument, 0, 'u'}, { "daemonize", no_argument, NULL, 'd'}, { "interval", required_argument, NULL, 'i'}, { "min-age", required_argument, NULL, 'a'}, { "max-cache", required_argument, NULL, 'c'}, { "verbose", no_argument, NULL, 'v'}, { "sync", no_argument, NULL, 's'}, { "help", no_argument, NULL, 'h' }, { NULL } }; memset(&opt, 0, sizeof(opt)); opt.o_data_sync = false; opt.o_verbose = LLAPI_MSG_INFO; opt.o_intv = CHLG_POLL_INTV; opt.o_min_age = REC_MIN_AGE; while ((c = getopt_long(argc, argv, "u:hm:dsi:a:c:v", options, NULL)) != EOF) { switch (c) { default: rc = -EINVAL; llapi_error(LLAPI_MSG_ERROR, rc, "%s: unknown option '-%c'\n", argv[0], optopt); return rc; case 'u': opt.o_chlg_user = optarg; break; case 'h': usage(argv[0]); break; case 'm': opt.o_mdtname = optarg; break; case 'd': opt.o_daemonize = true; break; case 'i': opt.o_intv = atoi(optarg); if (opt.o_intv < 0) { rc = -EINVAL; llapi_error(LLAPI_MSG_ERROR, rc, "bad value for -i %s", optarg); return rc; } break; case 'a': opt.o_min_age = atoi(optarg); if (opt.o_min_age < 0) { rc = -EINVAL; llapi_error(LLAPI_MSG_ERROR, rc, "bad value for -a %s", optarg); return rc; } break; case 'c': rc = Parser_size(&cache_size, optarg); if (rc < 0) { rc = -EINVAL; llapi_error(LLAPI_MSG_ERROR, rc, "bad valud for -c '%s'", optarg); return rc; } /* For value < 100, it is taken as the percentage of * total memory instead. */ if (cache_size < 100) cache_size = get_fid_cache_size(cache_size); llapi_printf(LLAPI_MSG_INFO, "Cache size: %lu\n", cache_size); break; case 'v': opt.o_verbose++; break; case 's': opt.o_data_sync = true; break; } } if (argc != optind + 1) { llapi_err_noerrno(LLAPI_MSG_ERROR, "%s: no mount point specified\n", argv[0]); usage(argv[0]); } opt.o_mntpt = argv[optind]; rc = llapi_search_fsname(opt.o_mntpt, fsname); if (rc < 0) { llapi_error(LLAPI_MSG_ERROR, rc, "cannot find a Lustre file system mounted at '%s'", opt.o_mntpt); return rc; } if (!opt.o_mdtname) usage(argv[0]); if (!opt.o_chlg_user) usage(argv[0]); if (opt.o_daemonize) { rc = daemon(1, 1); if (rc < 0) { rc = -errno; llapi_error(LLAPI_MSG_ERROR, rc, "cannot daemonize"); return rc; } setbuf(stdout, NULL); } opt.o_cached_fid_hiwm = cache_size / sizeof(struct fid_rec); opt.o_batch_sync_cnt = opt.o_cached_fid_hiwm / 2; rc = lsom_setup(); if (rc < 0) return rc; while (!stop) { bool eof = false; llapi_printf(LLAPI_MSG_DEBUG, "Start receiving records\n"); rc = llapi_changelog_start(&chglog_hdlr, CHANGELOG_FLAG_BLOCK | CHANGELOG_FLAG_JOBID | CHANGELOG_FLAG_EXTRA_FLAGS, opt.o_mdtname, 0); if (rc) { llapi_error(LLAPI_MSG_ERROR, rc, "unable to open changelog of MDT [%s]\n", opt.o_mdtname); return rc; } while (!eof && !stop) { rc = llapi_changelog_recv(chglog_hdlr, &rec); switch (rc) { case 0: rc = process_record(rec); if (rc) { llapi_error(LLAPI_MSG_ERROR, rc, "failed to process record"); ret = rc; } llapi_changelog_free(&rec); rc = lsom_check_sync(); if (rc) { stop = true; ret = rc; } break; case 1: /* EOF */ llapi_printf(LLAPI_MSG_DEBUG, "finished reading [%s]\n", opt.o_mdtname); eof = true; break; case -EINVAL: /* FS unmounted */ case -EPROTO: /* error in KUC channel */ default: stop = true; llapi_error(LLAPI_MSG_ERROR, rc, "failed to get changelog record"); ret = rc; break; } } /* reach EOF of changelog */ rc = llapi_changelog_fini(&chglog_hdlr); if (rc) { llapi_error(LLAPI_MSG_ERROR, rc, "unable to close changelog of MDT [%s]", opt.o_mdtname); ret = rc; return rc; } if (opt.o_daemonize) { sleep(opt.o_intv); rc = lsom_check_sync(); if (rc) { stop = true; ret = rc; } } else { lsom_start_update(head.lh_cached_count); stop = true; } } lsom_cleanup(); return ret; }