Whamcloud - gitweb
LU-8648 all: remove all Sun license and URL references
[fs/lustre-release.git] / lustre / utils / lustre_rsync.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2014, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/utils/lustre_rsync.c
33  *
34  * Author: Kalpak Shah <Kalpak.Shah@Sun.COM>
35  * Author: Manoj Joseph <Manoj.Joseph@Sun.COM>
36  */
37
38 /*
39  * - lustre_rsync is a tool for replicating a lustre filesystem.
40  *
41  * - The source-fs is a live lustre filesystem. It is not a
42  * snapshot. It is mounted and undergoing changes
43  *
44  * - The target-fs is a copy of the source-fs from the past. Let's
45  * call this point, the 'sync point'.
46  *
47  * - There is a changelog of all metadata operations that happened on
48  * the filesystem since the 'sync point'.
49  *
50  * - lustre_rsync replicates all the operations saved in the changelog
51  * on to the target filesystem to make it identical to the source.
52  *
53  * To facilitate replication, the lustre filesystem provides
54  *    a) a way to get the current filesystem path of a given FID
55  *    b) a way to open files by specifying its FID
56  *
57  * The changelog only has a limited amount of information.
58  *  tfid - The FID of the target file
59  *  pfid - The FID of the parent of the target file (at the time of
60  *         the operation)
61  *  sfid - The FID of the source file
62  *  spfid - The FID of the parent of the source file
63  *  name - The name of the target file (at the time of the operation), the name
64  *         of the source file is appended (delimited with '\0') if this
65  *         operation involves a source
66  *
67  * With just this information, it is not alwasy possible to determine
68  * the file paths for each operation. For instance, if pfid does not
69  * exist on the source-fs (due to a subsequent deletion), its path
70  * cannot be queried. In such cases, lustre_rsync keeps the files in a
71  * special directory ("/.lustrerepl"). Once all the operations in a
72  * changelog are replayed, all the files in this special directory
73  * will get moved to the location as in the source-fs.
74  *
75  * Shorthand used: f2p(fid) = fid2path(fid)
76  *
77  * The following are the metadata operations of interest.
78  * 1. creat
79  *    If tfid is absent on the source-fs, ignore this operation
80  *    If pfid is absent on the source-fs [or]
81  *    if f2p(pfid) is not present on target-fs [or]
82  *    if f2p(pfid)+name != f2p(tfid)
83  *      creat .lustrerepl/tfid
84  *      track [pfid,tfid,name]
85  *    Else
86  *      creat f2p[tfid]
87  *
88  * 2. remove
89  *    If .lustrerepl/[tfid] is present on the target
90  *      rm .lustrerepl/[tfid]
91  *    Else if pfid is present on the source-fs,
92  *      if f2p(pfid)+name is present,
93  *        rm f2p(pfid)+name
94  *
95  * 3. move (spfid,sname) to (pfid,name)
96  *    If pfid is present
97  *      if spfid is also present, mv (spfid,sname) to (pfid,name)
98  *      else mv .lustrerepl/[sfid] to (pfid,name)
99  *    Else if pfid is not present,
100  *      if spfid is present, mv (spfid,sname) .lustrerepl/[sfid]
101  *    If moving out of .lustrerepl
102  *      move out all its children in .lustrerepl.
103  *      [pfid,tfid,name] tracked from (1) is used for this.
104  */
105
106 #include <stdio.h>
107 #include <stdlib.h>
108 #include <string.h>
109 #include <unistd.h>
110 #include <getopt.h>
111 #include <stdarg.h>
112 #include <fcntl.h>
113 #include <signal.h>
114 #include <sys/stat.h>
115 #include <sys/types.h>
116 #include <errno.h>
117 #include <limits.h>
118 #include <utime.h>
119 #include <time.h>
120 #include <sys/xattr.h>
121
122 #include <libcfs/util/string.h>
123 #include <libcfs/util/parser.h>
124 #include <lustre/lustreapi.h>
125 #include <lustre/lustre_idl.h>
126 #include "lustre_rsync.h"
127
128 #define REPLICATE_STATUS_VER 1
129 #define CLEAR_INTERVAL 100
130 #define DEFAULT_RSYNC_THRESHOLD 0xA00000 /* 10 MB */
131
132 #define TYPE_STR_LEN 16
133
134 #define DEFAULT_MDT "-MDT0000"
135 #define SPECIAL_DIR ".lustrerepl"
136 #define RSYNC "rsync"
137 #define TYPE "type"
138
139 /* Debug flags */
140 #define DINFO 1
141 #define DTRACE 2
142
143 /* Not used; declared for fulfilling obd.c's dependency. */
144 command_t cmdlist[0];
145 extern int obd_initialize(int argc, char **argv);
146
147 /* Information for processing a changelog record. This structure is
148    allocated on the heap instead of allocating large variables on the
149    stack. */
150 struct lr_info {
151         long long recno;
152         int target_no;
153         unsigned int is_extended:1;
154         enum changelog_rec_type type;
155         char tfid[LR_FID_STR_LEN];
156         char pfid[LR_FID_STR_LEN];
157         char sfid[LR_FID_STR_LEN];
158         char spfid[LR_FID_STR_LEN];
159         char sname[NAME_MAX + 1];
160         char name[NAME_MAX + 1];
161         char src[PATH_MAX + 1];
162         char dest[PATH_MAX + 1];
163         char path[PATH_MAX + 1];
164         char savedpath[PATH_MAX + 1];
165         char link[PATH_MAX + 1];
166         char linktmp[PATH_MAX + 1];
167         char cmd[PATH_MAX];
168         int bufsize;
169         char *buf;
170
171         /* Variables for querying the xattributes */
172         char *xlist;
173         size_t xsize;
174         char *xvalue;
175         size_t xvsize;
176 };
177
178 struct lr_parent_child_list {
179         struct lr_parent_child_log pc_log;
180         struct lr_parent_child_list *pc_next;
181 };
182
183 struct lustre_rsync_status *status;
184 char *statuslog;  /* Name of the status log file */
185 int logbackedup;
186 int noxattr;    /* Flag to turn off replicating xattrs */
187 int noclear;    /* Flag to turn off clearing changelogs */
188 int debug;      /* Flag to turn debugging information on and off */
189 int verbose;    /* Verbose output */
190 long long rec_count; /* No of changelog records that were processed */
191 int errors;
192 int dryrun;
193 int use_rsync;  /* Flag to turn on use of rsync to copy data */
194 long long rsync_threshold = DEFAULT_RSYNC_THRESHOLD;
195 int quit;       /* Flag to stop processing the changelog; set on the
196                    receipt of a signal */
197 int abort_on_err = 0;
198
199 char rsync[PATH_MAX];
200 char rsync_ver[PATH_MAX];
201 struct lr_parent_child_list *parents;
202
203 FILE *debug_log;
204
205 /* Command line options */
206 struct option long_opts[] = {
207         {"source",      required_argument, 0, 's'},
208         {"target",      required_argument, 0, 't'},
209         {"mdt",         required_argument, 0, 'm'},
210         {"user",        required_argument, 0, 'u'},
211         {"statuslog",   required_argument, 0, 'l'},
212         {"verbose",     no_argument,       0, 'v'},
213         {"xattr",       required_argument, 0, 'x'},
214         {"dry-run",     no_argument,       0, 'z'},
215         /* Undocumented options follow */
216         {"cl-clear",    required_argument, 0, 'c'},
217         {"use-rsync",   no_argument,       0, 'r'},
218         {"rsync-threshold", required_argument, 0, 'y'},
219         {"start-recno", required_argument, 0, 'n'},
220         {"abort-on-err",no_argument,       0, 'a'},
221         {"debug",       required_argument, 0, 'd'},
222         {"debuglog",    required_argument, 0, 'D'},
223         {0, 0, 0, 0}
224 };
225
226 /* Command line usage */
227 void lr_usage()
228 {
229         fprintf(stderr, "\tlustre_rsync -s <lustre_root_path> -t <target_path> "
230                 "-m <mdt> -r <user id> -l <status log>\n"
231                 "lustre_rsync can also pick up parameters from a "
232                 "status log created earlier.\n"
233                 "\tlustre_rsync -l <log_file>\n"
234                 "options:\n"
235                 "\t--xattr <yes|no> replicate EAs\n"
236                 "\t--abort-on-err   abort at first err\n"
237                 "\t--verbose\n"
238                 "\t--dry-run        don't write anything\n");
239 }
240
241 #define DEBUG_ENTRY(info)                                                      \
242         lr_debug(D_TRACE, "***** Start %lld %s (%d) %s %s %s *****\n",         \
243                  (info)->recno, changelog_type2str((info)->type),              \
244                  (info)->type, (info)->tfid, (info)->pfid, (info)->name);
245
246 #define DEBUG_EXIT(info, rc)                                                   \
247         lr_debug(D_TRACE, "##### End %lld %s (%d) %s %s %s rc=%d #####\n",     \
248                  (info)->recno, changelog_type2str((info)->type),              \
249                  (info)->type, (info)->tfid, (info)->pfid, (info)->name, rc);
250
251 /* Print debug information. This is controlled by the value of the
252    global variable 'debug' */
253 void lr_debug(int level, const char *fmt, ...)
254 {
255         va_list ap;
256
257         if (level > debug)
258                 return;
259
260         va_start(ap, fmt);
261         if (debug_log != NULL)
262                 vfprintf(debug_log, fmt, ap);
263         else
264                 vfprintf(stdout, fmt, ap);
265         va_end(ap);
266 }
267
268
269 void * lr_grow_buf(void *buf, int size)
270 {
271         void *ptr;
272
273         ptr = realloc(buf, size);
274         if (ptr == NULL)
275                 free(buf);
276         return ptr;
277 }
278
279
280 /* Use rsync to replicate file data */
281 int lr_rsync_data(struct lr_info *info)
282 {
283         int rc;
284         struct stat st_src, st_dest;
285         char cmd[PATH_MAX];
286
287         lr_debug(DTRACE, "Syncing data%s\n", info->tfid);
288
289         rc = stat(info->src, &st_src);
290         if (rc == -1) {
291                 fprintf(stderr, "Error: Unable to stat src=%s %s\n",
292                         info->src, info->name);
293                 if (errno == ENOENT)
294                         return 0;
295                 else
296                         return -errno;
297         }
298         rc = stat(info->dest, &st_dest);
299         if (rc == -1) {
300                 fprintf(stderr, "Error: Unable to stat dest=%s\n",
301                         info->dest);
302                 return -errno;
303         }
304
305         if (st_src.st_mtime != st_dest.st_mtime ||
306             st_src.st_size != st_dest.st_size) {
307                 /* XXX spawning off an rsync for every data sync and
308                  * waiting synchronously is bad for performance.
309                  * librsync could possibly used here. But it does not
310                  * seem to be of production grade. Multi-threaded
311                  * replication is also to be considered.
312                  */
313                 int status;
314                 snprintf(cmd, PATH_MAX, "%s --inplace %s %s", rsync, info->src,
315                         info->dest);
316                 lr_debug(DTRACE, "\t%s %s\n", cmd, info->tfid);
317                 status = system(cmd);
318                 if (status == -1) {
319                         rc = -errno;
320                 } else if (WIFEXITED(status)) {
321                         status = WEXITSTATUS(status);
322                         if (!status)
323                                 rc = 0;
324                         else if (status == 23 || status == 24)
325                                 /* Error due to vanished source files;
326                                    Ignore this error*/
327                                 rc = 0;
328                         else
329                                 rc = -EINVAL;
330                         if (status)
331                                 lr_debug(DINFO, "rsync %s exited with %d %d\n",
332                                          info->src, status, rc);
333                 } else {
334                         rc = -EINTR;
335                 }
336         } else {
337                 lr_debug(DTRACE, "Not syncing %s and %s %s\n", info->src,
338                          info->dest, info->tfid);
339         }
340
341         return rc;
342 }
343
344 int lr_copy_data(struct lr_info *info)
345 {
346         int fd_src = -1;
347         int fd_dest = -1;
348         int bufsize;
349         int rsize;
350         int rc = 0;
351         struct stat st_src;
352         struct stat st_dest;
353
354         fd_src = open(info->src, O_RDONLY);
355         if (fd_src == -1)
356                 return -errno;
357         if (fstat(fd_src, &st_src) == -1 ||
358             stat(info->dest, &st_dest) == -1)
359                 goto out;
360
361         if (st_src.st_mtime == st_dest.st_mtime &&
362             st_src.st_size == st_dest.st_size)
363                 goto out;
364
365         if (st_src.st_size > rsync_threshold && rsync[0] != '\0') {
366                 /* It is more efficient to use rsync to replicate
367                    large files. Any file larger than rsync_threshold
368                    is handed off to rsync. */
369                 lr_debug(DTRACE, "Using rsync to replicate %s\n", info->tfid);
370                 rc = lr_rsync_data(info);
371                 goto out;
372         }
373
374         fd_dest = open(info->dest, O_WRONLY | O_TRUNC, st_src.st_mode);
375         if (fd_dest == -1) {
376                 rc = -errno;
377                 goto out;
378         }
379         bufsize = st_dest.st_blksize;
380
381         if (info->bufsize < bufsize) {
382                 /* Grow buffer */
383                 info->buf = lr_grow_buf(info->buf, bufsize);
384                 if (info->buf == NULL) {
385                         rc = -ENOMEM;
386                         goto out;
387                 }
388                 info->bufsize = bufsize;
389         }
390
391         while (1) {
392                 char *buf;
393                 int wsize;
394
395                 buf = info->buf;
396                 rsize = read(fd_src, buf, bufsize);
397                 if (rsize == 0) {
398                         rc = 0;
399                         break;
400                 }
401                 if (rsize < 0) {
402                         rc = -errno;
403                         break;
404                 }
405                 do {
406                         wsize = write(fd_dest, buf, rsize);
407                         if (wsize <= 0) {
408                                 rc = -errno;
409                                 break;
410                         }
411                         rsize -= wsize;
412                         buf += wsize;
413                 } while (rsize > 0);
414         }
415         fsync(fd_dest);
416
417 out:
418         if (fd_src != -1)
419                 close(fd_src);
420         if (fd_dest != -1)
421                 close(fd_dest);
422
423         return rc;
424 }
425
426 /* Copy data from source to destination */
427 int lr_sync_data(struct lr_info *info)
428 {
429         if (use_rsync)
430                 return lr_rsync_data(info);
431         else
432                 return lr_copy_data(info);
433 }
434
435 /* Copy all attributes from file src to file dest */
436 int lr_copy_attr(char *src, char *dest)
437 {
438         struct stat st;
439         struct utimbuf time;
440
441         if (stat(src, &st) == -1 ||
442             chmod(dest, st.st_mode) == -1 ||
443             chown(dest, st.st_uid, st.st_gid) == -1)
444                 return -errno;
445
446         time.actime = st.st_atime;
447         time.modtime = st.st_mtime;
448         if (utime(dest, &time) == -1)
449                 return -errno;
450         return 0;
451 }
452
453 /* Copy all xattrs from file info->src to info->dest */
454 int lr_copy_xattr(struct lr_info *info)
455 {
456         size_t size = info->xsize;
457         int start;
458         int len;
459         int rc;
460
461         if (noxattr)
462                 return 0;
463
464         errno = 0;
465         rc = llistxattr(info->src, info->xlist, size);
466         lr_debug(DTRACE, "llistxattr(%s,%p) returned %d, errno=%d\n",
467                  info->src, info->xlist, rc, errno);
468         if ((rc > 0 && info->xlist == NULL) || errno == ERANGE) {
469                 size = rc > PATH_MAX ? rc : PATH_MAX;
470                 info->xlist = lr_grow_buf(info->xlist, size);
471                 if (info->xlist == NULL)
472                         return -ENOMEM;
473                 info->xsize = size;
474                 rc = llistxattr(info->src, info->xlist, size);
475                 lr_debug(DTRACE, "llistxattr %s returned %d, errno=%d\n",
476                          info->src, rc, errno);
477         }
478         if (rc < 0)
479                 return rc;
480
481         len = rc;
482         start = 0;
483         while (start < len) {
484                 size = info->xvsize;
485                 rc = lgetxattr(info->src, info->xlist + start,
486                                info->xvalue, size);
487                 if (info->xvalue == NULL || errno == ERANGE) {
488                         size = rc > PATH_MAX ? rc : PATH_MAX;
489                         info->xvalue = lr_grow_buf(info->xvalue, size);
490                         if (info->xvalue == NULL)
491                                 return -ENOMEM;
492                         info->xvsize = size;
493                         rc = lgetxattr(info->src, info->xlist + start,
494                                        info->xvalue, size);
495                 }
496                 lr_debug(DTRACE, "\t(%s,%d) rc=%p\n", info->xlist + start,
497                          info->xvalue, rc);
498                 if (rc > 0) {
499                         size = rc;
500                         rc = lsetxattr(info->dest, info->xlist + start,
501                                        info->xvalue, size, 0);
502                         lr_debug(DTRACE, "\tlsetxattr(), rc=%d, errno=%d\n",
503                                  rc, errno);
504                         if (rc == -1) {
505                                 if (errno != ENOTSUP) {
506                                         fprintf(stderr, "Error replicating "
507                                                 " xattr for %s: %d\n",
508                                                 info->dest, errno);
509                                         errors++;
510                                 }
511                                 rc = 0;
512                         }
513                 }
514                 start += strlen(info->xlist + start) + 1;
515         }
516
517         lr_debug(DINFO, "setxattr: %s %s\n", info->src, info->dest);
518
519         return rc;
520 }
521
522 /* Retrieve the filesystem path for a given FID and a given
523    linkno. The path is returned in info->path */
524 int lr_get_path_ln(struct lr_info *info, char *fidstr, int linkno)
525 {
526         long long recno = -1;
527         int rc;
528
529         rc = llapi_fid2path(status->ls_source, fidstr, info->path,
530                             PATH_MAX, &recno, &linkno);
531         if (rc < 0 && rc != -ENOENT) {
532                 fprintf(stderr, "fid2path error: (%s, %s) %d %s\n",
533                         status->ls_source, fidstr, -rc, strerror(errno = -rc));
534         }
535
536         return rc;
537 }
538
539 /* Retrieve the filesystem path for a given FID. The path is returned
540    in info->path */
541 int lr_get_path(struct lr_info *info, char *fidstr)
542 {
543         return lr_get_path_ln(info, fidstr, 0);
544 }
545
546 /* Generate the path for opening by FID */
547 void lr_get_FID_PATH(char *mntpt, char *fidstr, char *buf, int bufsize)
548 {
549         /* Open-by-FID path is <mntpt>/.lustre/fid/[SEQ:OID:VER] */
550         snprintf(buf, bufsize, "%s/%s/fid/%s", mntpt, dot_lustre_name,
551                  fidstr);
552         return;
553 }
554
555 /* Read the symlink information into 'info->link' */
556 int lr_get_symlink(struct lr_info *info)
557 {
558         int rc;
559         char *link;
560
561         lr_get_FID_PATH(status->ls_source, info->tfid, info->src, PATH_MAX);
562         rc = readlink(info->src, info->linktmp, PATH_MAX);
563         if (rc > 0)
564                 info->linktmp[rc] = '\0';
565         else
566                 return rc;
567         lr_debug(DTRACE, "symlink: readlink returned %s\n", info->linktmp);
568
569         if (strncmp(info->linktmp, status->ls_source,
570                     strlen(status->ls_source)) == 0) {
571                 /* Strip source fs path and replace with target fs path. */
572                 link = info->linktmp + strlen(status->ls_source);
573                 snprintf(info->src, PATH_MAX, "%s%s",
574                          status->ls_targets[info->target_no], link);
575                 link = info->src;
576         } else {
577                 link = info->linktmp;
578         }
579         strlcpy(info->link, link, sizeof(info->link));
580
581         return rc;
582 }
583
584 /* Create file/directory/device file/symlink. */
585 int lr_mkfile(struct lr_info *info)
586 {
587         struct stat st;
588         int rc = 0;
589
590         errno = 0;
591         lr_debug(DINFO, "mkfile(%d) %s \n", info->type, info->dest);
592         if (info->type == CL_MKDIR) {
593                 rc = mkdir(info->dest, 0777);
594         } else if (info->type == CL_SOFTLINK) {
595                 lr_get_symlink(info);
596                 rc = symlink(info->link, info->dest);
597         } else if (info->type == CL_MKNOD) {
598                 lr_get_FID_PATH(status->ls_source, info->tfid,
599                                     info->src, PATH_MAX);
600                 rc = stat(info->src, &st);
601                 if (rc == -1) {
602                         if (errno == ENOENT)
603                                 return 0;
604                         else
605                                 return -errno;
606                 }
607                 rc = mknod(info->dest, st.st_mode, st.st_rdev);
608         } else {
609                 rc = mknod(info->dest, S_IFREG | 0777, 0);
610         }
611
612         if (rc < 0) {
613                 if (errno == EEXIST)
614                         rc = 0;
615                 else
616                         return -errno;
617         }
618
619         /* Sync data and attributes */
620         if (info->type == CL_CREATE || info->type == CL_MKDIR) {
621                 lr_debug(DTRACE, "Syncing data and attributes %s\n",
622                          info->tfid);
623                 (void) lr_copy_xattr(info);
624                 if (info->type == CL_CREATE)
625                         rc = lr_sync_data(info);
626                 if (!rc)
627                         rc = lr_copy_attr(info->src, info->dest);
628
629                 if (rc == -ENOENT)
630                         /* Source file has disappeared. Not an error. */
631                         rc = 0;
632         } else {
633                 lr_debug(DTRACE, "Not syncing data and attributes %s\n",
634                          info->tfid);
635         }
636
637         return rc;
638 }
639
640 int lr_add_pc(const char *pfid, const char *tfid, const char *name)
641 {
642         struct lr_parent_child_list *p;
643         size_t len;
644
645         p = calloc(1, sizeof(*p));
646         if (p == NULL)
647                 return -ENOMEM;
648         len = strlcpy(p->pc_log.pcl_pfid, pfid, sizeof(p->pc_log.pcl_pfid));
649         if (len >= sizeof(p->pc_log.pcl_pfid))
650                 goto out_err;
651         len = strlcpy(p->pc_log.pcl_tfid, tfid, sizeof(p->pc_log.pcl_tfid));
652         if (len >= sizeof(p->pc_log.pcl_tfid))
653                 goto out_err;
654         len = strlcpy(p->pc_log.pcl_name, name, sizeof(p->pc_log.pcl_name));
655         if (len >= sizeof(p->pc_log.pcl_name))
656                 goto out_err;
657
658         p->pc_next = parents;
659         parents = p;
660         return 0;
661
662 out_err:
663         free(p);
664         return -E2BIG;
665 }
666
667 void lr_cascade_move(const char *fid, const char *dest, struct lr_info *info)
668 {
669         struct lr_parent_child_list *curr, *prev;
670         char *d;
671         int rc;
672
673         d = calloc(1, PATH_MAX + 1);
674         prev = curr = parents;
675         while (curr) {
676                 if (strcmp(curr->pc_log.pcl_pfid, fid) == 0) {
677                         snprintf(d, PATH_MAX, "%s/%s", dest,
678                                  curr->pc_log.pcl_name);
679                         snprintf(info->src, PATH_MAX, "%s/%s/%s",
680                                 status->ls_targets[info->target_no],
681                                 SPECIAL_DIR, curr->pc_log.pcl_tfid);
682                         rc = rename(info->src, d);
683                         if (rc == -1) {
684                                 fprintf(stderr, "Error renaming file "
685                                         " %s to %s: %d\n",
686                                         info->src, d, errno);
687                                 errors++;
688                         }
689                         if (curr == parents)
690                                 parents = curr->pc_next;
691                         else
692                                 prev->pc_next = curr->pc_next;
693                         lr_cascade_move(curr->pc_log.pcl_tfid, d, info);
694                         free(curr);
695                         prev = curr = parents;
696
697                 } else {
698                         prev = curr;
699                         curr = curr->pc_next;
700                 }
701         }
702
703         free(d);
704 }
705
706 /* remove [info->spfid, info->sfid] from parents */
707 int lr_remove_pc(const char *pfid, const char *tfid)
708 {
709         struct lr_parent_child_list *curr, *prev;
710
711         for (prev = curr = parents; curr; prev = curr, curr = curr->pc_next) {
712                 if (strcmp(curr->pc_log.pcl_pfid, pfid) == 0 &&
713                     strcmp(curr->pc_log.pcl_tfid, tfid) == 0) {
714                         if (curr == parents)
715                                 parents = curr->pc_next;
716                         else
717                                 prev->pc_next = curr->pc_next;
718                         free(curr);
719                         break;
720                 }
721         }
722         return 0;
723 }
724
725 /* Create file under SPECIAL_DIR with its tfid as its name. */
726 int lr_mk_special(struct lr_info *info)
727 {
728         int rc;
729
730         snprintf(info->dest, PATH_MAX, "%s/%s/%s",
731                 status->ls_targets[info->target_no], SPECIAL_DIR,
732                 info->tfid);
733
734         rc = lr_mkfile(info);
735         if (rc)
736                 return rc;
737
738         rc = lr_add_pc(info->pfid, info->tfid, info->name);
739         return rc;
740 }
741
742 /* Remove a file or directory */
743 int lr_rmfile(struct lr_info *info)
744 {
745         int rc;
746
747         if (info->type == CL_RMDIR)
748                 rc = rmdir(info->dest);
749         else
750                 rc = unlink(info->dest);
751         if (rc == -1)
752                 rc = -errno;
753         return rc;
754 }
755
756 /* Recursively remove directory and its contents */
757 int lr_rm_recursive(struct lr_info *info)
758 {
759         int rc;
760
761         snprintf(info->cmd, PATH_MAX, "rm -rf %s", info->dest);
762         rc = system(info->cmd);
763         if (rc == -1)
764                 rc = -errno;
765
766         return rc;
767 }
768
769 /* Remove a file under SPECIAL_DIR with its tfid as its name. */
770 int lr_rm_special(struct lr_info *info)
771 {
772         int rc;
773
774         snprintf(info->dest, PATH_MAX, "%s/%s/%s",
775                  status->ls_targets[info->target_no], SPECIAL_DIR,
776                  info->tfid);
777         rc = lr_rmfile(info);
778
779         if (rc)
780                 lr_debug(DINFO, "remove: %s; rc=%d, errno=%d\n",
781                          info->dest, rc, errno);
782         return rc;
783 }
784
785 /* Replicate file and directory create events */
786 int lr_create(struct lr_info *info)
787 {
788         int len;
789         int rc1 = 0;
790         int rc;
791         int mkspecial = 0;
792
793         /* Is target FID present on the source? */
794         rc = lr_get_path(info, info->tfid);
795         if (rc == -ENOENT) {
796                 /* Source file has disappeared. Not an error. */
797                 lr_debug(DINFO, "create: tfid %s not found on"
798                          "source-fs\n", info->tfid);
799                 return 0;
800         } else if (rc) {
801                 return rc;
802         }
803         strcpy(info->savedpath, info->path);
804
805         /* Is parent FID present on the source */
806         rc = lr_get_path(info, info->pfid);
807         if (rc == -ENOENT) {
808                 lr_debug(DINFO, "create: pfid %s not found on source-fs\n",
809                          info->tfid);
810                 mkspecial = 1;
811         } else if (rc < 0) {
812                 return rc;
813         }
814
815         /* Is f2p(pfid)+name != f2p(tfid)? If not the file has moved. */
816         len = strlen(info->path);
817         if (len == 1 && info->path[0] == '/')
818                 snprintf(info->dest, PATH_MAX, "%s", info->name);
819         else if (len - 1 > 0 && info->path[len - 1] == '/')
820                 snprintf(info->dest, PATH_MAX, "%s%s", info->path, info->name);
821         else
822                 snprintf(info->dest, PATH_MAX, "%s/%s", info->path, info->name);
823
824         lr_debug(DTRACE, "dest = %s; savedpath = %s\n", info->dest,
825                  info->savedpath);
826         if (strncmp(info->dest, info->savedpath, PATH_MAX) != 0) {
827                 lr_debug(DTRACE, "create: file moved (%s). %s != %s\n",
828                          info->tfid, info->dest, info->savedpath);
829                 mkspecial = 1;
830         }
831
832         /* Is f2p(pfid) present on the target? If not, the parent has
833            moved */
834         if (!mkspecial) {
835                 snprintf(info->dest, PATH_MAX, "%s/%s", status->ls_targets[0],
836                         info->path);
837                 if (access(info->dest, F_OK) != 0) {
838                         lr_debug(DTRACE, "create: parent %s not found\n",
839                                 info->dest);
840                         mkspecial = 1;
841                 }
842         }
843         for (info->target_no = 0; info->target_no < status->ls_num_targets;
844              info->target_no++) {
845                 snprintf(info->dest, PATH_MAX, "%s/%s",
846                         status->ls_targets[info->target_no], info->savedpath);
847                 lr_get_FID_PATH(status->ls_source, info->tfid, info->src,
848                                     PATH_MAX);
849
850                 if (!mkspecial)
851                         rc1 = lr_mkfile(info);
852                 if (mkspecial || rc1 == -ENOENT) {
853                         rc1 = lr_mk_special(info);
854                 }
855                 if (rc1)
856                         rc = rc1;
857         }
858         return rc;
859 }
860
861 /* Replicate a file remove (rmdir/unlink) operation */
862 int lr_remove(struct lr_info *info)
863 {
864         int rc = 0;
865         int rc1;
866
867         for (info->target_no = 0; info->target_no < status->ls_num_targets;
868              info->target_no++) {
869
870                 rc1 = lr_rm_special(info);
871                 if (!rc1)
872                         continue;
873
874                 rc1 = lr_get_path(info, info->pfid);
875                 if (rc1 == -ENOENT) {
876                         lr_debug(DINFO, "remove: pfid %s not found\n",
877                                  info->pfid);
878                         continue;
879                 }
880                 if (rc1) {
881                         rc = rc1;
882                         continue;
883                 }
884                 snprintf(info->dest, PATH_MAX, "%s/%s/%s",
885                         status->ls_targets[info->target_no], info->path,
886                         info->name);
887
888                 rc1 = lr_rmfile(info);
889                 lr_debug(DINFO, "remove: %s; rc1=%d, errno=%d\n",
890                          info->dest, rc1, errno);
891                 if (rc1 == -ENOTEMPTY)
892                         rc1 = lr_rm_recursive(info);
893
894                 if (rc1) {
895                         rc = rc1;
896                         continue;
897                 }
898         }
899         return rc;
900 }
901
902 /* Replicate a rename/move operation. */
903 int lr_move(struct lr_info *info)
904 {
905         int rc = 0;
906         int rc1;
907         int rc_dest, rc_src;
908         int special_src = 0;
909         int special_dest = 0;
910         char srcpath[PATH_MAX + 1] = "";
911
912         LASSERT(info->is_extended);
913
914         rc_src = lr_get_path(info, info->spfid);
915         if (rc_src < 0 && rc_src != -ENOENT)
916                 return rc_src;
917         memcpy(srcpath, info->path, strlen(info->path));
918
919         rc_dest = lr_get_path(info, info->pfid);
920         if (rc_dest < 0 && rc_dest != -ENOENT)
921                 return rc_dest;
922
923         for (info->target_no = 0; info->target_no < status->ls_num_targets;
924              info->target_no++) {
925
926                 if (!rc_dest) {
927                         snprintf(info->dest, PATH_MAX, "%s/%s",
928                                 status->ls_targets[info->target_no],
929                                 info->path);
930                         if (access(info->dest, F_OK) != 0) {
931                                 rc_dest = -errno;
932                         } else {
933                                 snprintf(info->dest, PATH_MAX, "%s/%s/%s",
934                                         status->ls_targets[info->target_no],
935                                         info->path, info->name);
936                         }
937                         lr_debug(DINFO, "dest path %s rc_dest=%d\n", info->dest,
938                                  rc_dest);
939                 }
940                 if (rc_dest == -ENOENT) {
941                         snprintf(info->dest, PATH_MAX, "%s/%s/%s",
942                                 status->ls_targets[info->target_no],
943                                 SPECIAL_DIR, info->sfid);
944                         special_dest = 1;
945                         lr_debug(DINFO, "special dest %s\n", info->dest);
946                 }
947
948                 if (!rc_src) {
949                         snprintf(info->src, PATH_MAX, "%s/%s/%s",
950                                 status->ls_targets[info->target_no],
951                                 srcpath, info->sname);
952                         lr_debug(DINFO, "src path %s rc_src=%d\n", info->src,
953                                  rc_src);
954                 }
955                 if (rc_src == -ENOENT || (access(info->src, F_OK) != 0 &&
956                                           errno == ENOENT)) {
957                         snprintf(info->src, PATH_MAX, "%s/%s/%s",
958                                 status->ls_targets[info->target_no],
959                                 SPECIAL_DIR, info->sfid);
960                         special_src = 1;
961                         lr_debug(DINFO, "special src %s\n", info->src);
962                 }
963
964                 rc1 = 0;
965                 errno = 0;
966                 if (strcmp(info->src, info->dest) != 0) {
967                         rc1 = rename(info->src, info->dest);
968                         if (rc1 == -1)
969                                 rc1 = -errno;
970                         lr_debug(DINFO, "rename returns %d\n", rc1);
971                 }
972
973                 if (special_src)
974                         rc1 = lr_remove_pc(info->spfid, info->sfid);
975
976                 if (!special_dest)
977                         lr_cascade_move(info->sfid, info->dest, info);
978                 else
979                         rc1 = lr_add_pc(info->pfid, info->sfid, info->name);
980
981                 lr_debug(DINFO, "move: %s [to] %s rc1=%d, errno=%d\n",
982                          info->src, info->dest, rc1, errno);
983                 if (rc1)
984                         rc = rc1;
985         }
986         return rc;
987 }
988
989 /* Replicate a hard link */
990 int lr_link(struct lr_info *info)
991 {
992         int i;
993         int rc;
994         int rc1;
995         struct stat st;
996
997         lr_get_FID_PATH(status->ls_source, info->tfid, info->src, PATH_MAX);
998         rc = stat(info->src, &st);
999         if (rc == -1)
1000                 return -errno;
1001
1002         for (info->target_no = 0; info->target_no < status->ls_num_targets;
1003              info->target_no++) {
1004
1005                 info->src[0] = 0;
1006                 info->dest[0] = 0;
1007                 rc1 = 0;
1008
1009                 /*
1010                  * The changelog record has the new parent directory FID and
1011                  * name of the target file. So info->dest can be constructed
1012                  * by getting the path of the new parent directory and
1013                  * appending the target file name.
1014                  */
1015                 rc1 = lr_get_path(info, info->pfid);
1016                 lr_debug(rc1 ? 0 : DTRACE, "\tparent fid2path %s, %s, rc=%d\n",
1017                          info->path, info->name, rc1);
1018
1019                 if (rc1 == 0) {
1020                         snprintf(info->dest, sizeof(info->dest), "%s/%s/%s",
1021                                  status->ls_targets[info->target_no],
1022                                  info->path, info->name);
1023                         lr_debug(DINFO, "link destination is %s\n", info->dest);
1024                 }
1025
1026                 /* Search through the hardlinks to get the src */
1027                 for (i = 0; i < st.st_nlink && info->src[0] == 0; i++) {
1028                         rc1 = lr_get_path_ln(info, info->tfid, i);
1029                         lr_debug(rc1 ? 0:DTRACE, "\tfid2path %s, %s, %d rc=%d\n",
1030                                  info->path, info->name, i, rc1);
1031                         if (rc1)
1032                                 break;
1033
1034                         /*
1035                          * Compare the path of target FID with info->dest
1036                          * to find out info->src.
1037                          */
1038                         char srcpath[PATH_MAX];
1039
1040                         snprintf(srcpath, sizeof(srcpath), "%s/%s",
1041                                  status->ls_targets[info->target_no],
1042                                  info->path);
1043
1044                         if (strcmp(srcpath, info->dest) != 0) {
1045                                 strlcpy(info->src, srcpath, sizeof(info->src));
1046                                 lr_debug(DINFO, "link source is %s\n",
1047                                          info->src);
1048                         }
1049                 }
1050
1051                 if (rc1) {
1052                         rc = rc1;
1053                         continue;
1054                 }
1055
1056                 if (info->src[0] == 0)
1057                         snprintf(info->src, PATH_MAX, "%s/%s/%s",
1058                                 status->ls_targets[info->target_no],
1059                                 SPECIAL_DIR, info->tfid);
1060                 else if (info->dest[0] == 0)
1061                         snprintf(info->dest, PATH_MAX, "%s/%s/%s",
1062                                 status->ls_targets[info->target_no],
1063                                 SPECIAL_DIR, info->tfid);
1064
1065                 rc1 = link(info->src, info->dest);
1066                 lr_debug(DINFO, "link: %s [to] %s; rc1=%d %s\n",
1067                          info->src, info->dest, rc1,
1068                          strerror(rc1 ? errno : 0));
1069
1070                 if (rc1)
1071                         rc = rc1;
1072         }
1073         return rc;
1074 }
1075
1076 /* Replicate file attributes */
1077 int lr_setattr(struct lr_info *info)
1078 {
1079         int rc1;
1080         int rc;
1081
1082         lr_get_FID_PATH(status->ls_source, info->tfid, info->src, PATH_MAX);
1083
1084         rc = lr_get_path(info, info->tfid);
1085         if (rc == -ENOENT)
1086                 lr_debug(DINFO, "setattr: %s not present on source-fs\n",
1087                          info->src);
1088         if (rc)
1089                 return rc;
1090
1091         for (info->target_no = 0; info->target_no < status->ls_num_targets;
1092              info->target_no++) {
1093
1094                 snprintf(info->dest, PATH_MAX, "%s/%s",
1095                          status->ls_targets[info->target_no], info->path);
1096                 lr_debug(DINFO, "setattr: %s %s %s", info->src, info->dest,
1097                          info->tfid);
1098
1099                 rc1 = lr_sync_data(info);
1100                 if (!rc1)
1101                         rc1 = lr_copy_attr(info->src, info->dest);
1102                 if (rc1)
1103                         rc = rc1;
1104         }
1105         return rc;
1106 }
1107
1108 /* Replicate xattrs */
1109 int lr_setxattr(struct lr_info *info)
1110 {
1111         int rc, rc1;
1112
1113         lr_get_FID_PATH(status->ls_source, info->tfid, info->src, PATH_MAX);
1114
1115         rc = lr_get_path(info, info->tfid);
1116         if (rc == -ENOENT)
1117                 lr_debug(DINFO, "setxattr: %s not present on source-fs\n",
1118                          info->src);
1119         if (rc)
1120                 return rc;
1121
1122         for (info->target_no = 0; info->target_no < status->ls_num_targets;
1123              info->target_no++) {
1124
1125                 snprintf(info->dest, PATH_MAX, "%s/%s",
1126                         status->ls_targets[info->target_no], info->path);
1127                 lr_debug(DINFO, "setxattr: %s %s %s\n", info->src, info->dest,
1128                          info->tfid);
1129
1130                 rc1 = lr_copy_xattr(info);
1131                 if (rc1)
1132                         rc = rc1;
1133         }
1134
1135         return rc;
1136 }
1137
1138 /* Parse a line of changelog entry */
1139 int lr_parse_line(void *priv, struct lr_info *info)
1140 {
1141         struct changelog_rec            *rec;
1142         struct changelog_ext_rename     *rnm;
1143         size_t                           namelen;
1144         size_t                           copylen = sizeof(info->name);
1145
1146         if (llapi_changelog_recv(priv, &rec) != 0)
1147                 return -1;
1148
1149         info->is_extended = !!(rec->cr_flags & CLF_RENAME);
1150         info->recno = rec->cr_index;
1151         info->type = rec->cr_type;
1152         snprintf(info->tfid, sizeof(info->tfid), DFID, PFID(&rec->cr_tfid));
1153         snprintf(info->pfid, sizeof(info->pfid), DFID, PFID(&rec->cr_pfid));
1154
1155         namelen = strnlen(changelog_rec_name(rec), rec->cr_namelen);
1156         if (copylen > namelen + 1)
1157                 copylen = namelen + 1;
1158         strlcpy(info->name, changelog_rec_name(rec), copylen);
1159
1160         /* Don't use rnm if CLF_RENAME isn't set */
1161         rnm = changelog_rec_rename(rec);
1162         if (rec->cr_flags & CLF_RENAME && !fid_is_zero(&rnm->cr_sfid)) {
1163                 copylen = sizeof(info->sname);
1164
1165                 snprintf(info->sfid, sizeof(info->sfid), DFID,
1166                          PFID(&rnm->cr_sfid));
1167                 snprintf(info->spfid, sizeof(info->spfid), DFID,
1168                          PFID(&rnm->cr_spfid));
1169                 namelen = changelog_rec_snamelen(rec);
1170                 if (copylen > namelen + 1)
1171                         copylen = namelen + 1;
1172                 strlcpy(info->sname, changelog_rec_sname(rec), copylen);
1173
1174                 if (verbose > 1)
1175                         printf("Rec %lld: %d %s %s\n", info->recno, info->type,
1176                                 info->name, info->sname);
1177         } else {
1178                 if (verbose > 1)
1179                         printf("Rec %lld: %d %s\n", info->recno, info->type,
1180                                 info->name);
1181         }
1182
1183         llapi_changelog_free(&rec);
1184
1185         rec_count++;
1186         return 0;
1187 }
1188
1189 /* Initialize the replication parameters */
1190 int lr_init_status()
1191 {
1192         size_t size = sizeof(struct lustre_rsync_status) + PATH_MAX + 1;
1193
1194         if (status != NULL)
1195                 return 0;
1196         status = calloc(size, 1);
1197         if (status == NULL)
1198                 return -ENOMEM;
1199         status->ls_version = REPLICATE_STATUS_VER;
1200         status->ls_size = size;
1201         status->ls_last_recno = -1;
1202         return 0;
1203 }
1204
1205 /* Make a backup of the statuslog */
1206 void lr_backup_log()
1207 {
1208         char backupfile[PATH_MAX];
1209
1210         if (logbackedup)
1211                 return;
1212         snprintf(backupfile, PATH_MAX, "%s.old", statuslog);
1213         (void) rename(statuslog, backupfile);
1214         logbackedup = 1;
1215
1216         return;
1217 }
1218
1219 /* Save replication parameters to a statuslog. */
1220 int lr_write_log()
1221 {
1222         int fd;
1223         size_t size;
1224         size_t write_size = status->ls_size;
1225         struct lr_parent_child_list *curr;
1226         int rc = 0;
1227
1228         if (statuslog == NULL)
1229                 return 0;
1230
1231         lr_backup_log();
1232
1233         fd = open(statuslog, O_WRONLY | O_CREAT | O_SYNC,
1234                              S_IRUSR | S_IWUSR);
1235         if (fd == -1) {
1236                 fprintf(stderr, "Error opening log file for writing (%s)\n",
1237                         statuslog);
1238                 return -1;
1239         }
1240         errno = 0;
1241         size = write(fd, status, write_size);
1242         if (size != write_size) {
1243                 fprintf(stderr, "Error writing to log file (%s) %d\n",
1244                         statuslog, errno);
1245                 close(fd);
1246                 return -1;
1247         }
1248
1249         for (curr = parents; curr; curr = curr->pc_next) {
1250                 size = write(fd, &curr->pc_log, sizeof(curr->pc_log));
1251                 if (size != sizeof(curr->pc_log)) {
1252                         fprintf(stderr, "Error writing to log file (%s) %d\n",
1253                                 statuslog, errno);
1254                         rc = -1;
1255                         break;
1256                 }
1257         }
1258         close(fd);
1259         return rc;
1260 }
1261
1262 /* Read statuslog and populate the replication parameters.  Command
1263  * line parameters take precedence over parameters in the log file.*/
1264 int lr_read_log()
1265 {
1266         struct lr_parent_child_list *tmp;
1267         struct lr_parent_child_log rec;
1268         struct lustre_rsync_status *s;
1269         int fd = -1;
1270         size_t size;
1271         size_t read_size = sizeof(struct lustre_rsync_status) + PATH_MAX + 1;
1272         int rc = 0;
1273
1274         if (statuslog == NULL)
1275                 return 0;
1276
1277         s = calloc(1, read_size);
1278         if (s == NULL) {
1279                 rc = -ENOMEM;
1280                 goto out;
1281         }
1282
1283         fd = open(statuslog, O_RDONLY);
1284         if (fd == -1) {
1285                 rc = -errno;
1286                 goto out;
1287         }
1288
1289         size = read(fd, s, read_size);
1290         if (size != read_size) {
1291                 rc = -EINVAL;
1292                 goto out;
1293         }
1294
1295         if (read_size < s->ls_size) {
1296                 read_size = s->ls_size;
1297                 s = lr_grow_buf(s, read_size);
1298                 if (s == NULL) {
1299                         rc = -ENOMEM;
1300                         goto out;
1301                 }
1302
1303                 if (lseek(fd, 0, SEEK_SET) == -1) {
1304                         rc = -ENOMEM;
1305                         goto out;
1306                 }
1307
1308                 size = read(fd, s, read_size);
1309                 if (size != read_size) {
1310                         rc = -EINVAL;
1311                         goto out;
1312                 }
1313         }
1314
1315         while (read(fd, &rec, sizeof(rec)) != 0) {
1316                 tmp = calloc(1, sizeof(*tmp));
1317                 if (!tmp) {
1318                         rc = -ENOMEM;
1319                         goto out;
1320                 }
1321
1322                 tmp->pc_log = rec;
1323                 tmp->pc_next = parents;
1324                 parents = tmp;
1325         }
1326
1327         /* copy uninitialized fields to status */
1328         if (status->ls_num_targets == 0) {
1329                 if (status->ls_size != s->ls_size) {
1330                         status = lr_grow_buf(status, s->ls_size);
1331                         if (status == NULL) {
1332                                 rc = -ENOMEM;
1333                                 goto out;
1334                         }
1335
1336                         status->ls_size = s->ls_size;
1337                 }
1338                 status->ls_num_targets = s->ls_num_targets;
1339                 memcpy(status->ls_targets, s->ls_targets,
1340                        (PATH_MAX + 1) * s->ls_num_targets);
1341         }
1342         if (status->ls_last_recno == -1)
1343                 status->ls_last_recno = s->ls_last_recno;
1344
1345         if (status->ls_registration[0] == '\0')
1346                 strlcpy(status->ls_registration, s->ls_registration,
1347                         sizeof(status->ls_registration));
1348
1349         if (status->ls_mdt_device[0] == '\0')
1350                 strlcpy(status->ls_mdt_device, s->ls_mdt_device,
1351                         sizeof(status->ls_mdt_device));
1352
1353         if (status->ls_source_fs[0] == '\0')
1354                 strlcpy(status->ls_source_fs, s->ls_source_fs,
1355                         sizeof(status->ls_source_fs));
1356
1357         if (status->ls_source[0] == '\0')
1358                 strlcpy(status->ls_source, s->ls_source,
1359                         sizeof(status->ls_source));
1360
1361  out:
1362         if (fd != -1)
1363                 close(fd);
1364         if (s)
1365                 free(s);
1366         return rc;
1367 }
1368
1369 /* Clear changelogs every CLEAR_INTERVAL records or at the end of
1370    processing. */
1371 int lr_clear_cl(struct lr_info *info, int force)
1372 {
1373         char            mdt_device[LR_NAME_MAXLEN + 1];
1374         long long       rec;
1375         int             rc = 0;
1376
1377         if (force || info->recno > status->ls_last_recno + CLEAR_INTERVAL) {
1378                 if (info->type == CL_RENAME)
1379                         rec = info->recno + 1;
1380                 else
1381                         rec = info->recno;
1382                 if (!noclear && !dryrun) {
1383                         /* llapi_changelog_clear modifies the mdt
1384                          * device name so make a copy of it until this
1385                          * is fixed.
1386                         */
1387                         strlcpy(mdt_device, status->ls_mdt_device,
1388                                 sizeof(mdt_device));
1389                         rc = llapi_changelog_clear(mdt_device,
1390                                                    status->ls_registration,
1391                                                    rec);
1392                         if (rc)
1393                                 printf("Changelog clear (%s, %s, %lld) "
1394                                        "returned %d\n", status->ls_mdt_device,
1395                                        status->ls_registration, rec, rc);
1396                 }
1397                 if (!rc && !dryrun) {
1398                         status->ls_last_recno = rec;
1399                         lr_write_log();
1400
1401                 }
1402         }
1403
1404         return rc;
1405 }
1406
1407 /* Locate a usable version of rsync. At this point we'll use any
1408    version. */
1409 int lr_locate_rsync()
1410 {
1411         FILE *fp;
1412         int len;
1413
1414         /* Locate rsync */
1415         snprintf(rsync, PATH_MAX, "%s -p %s", TYPE, RSYNC);
1416         fp = popen(rsync, "r");
1417         if (fp == NULL)
1418                 return -1;
1419
1420         if (fgets(rsync, PATH_MAX, fp) == NULL) {
1421                 fclose(fp);
1422                 return -1;
1423         }
1424
1425         len = strlen(rsync);
1426         if (len > 0 && rsync[len - 1] == '\n')
1427                 rsync[len - 1] = '\0';
1428         fclose(fp);
1429
1430         /* Determine the version of rsync */
1431         snprintf(rsync_ver, PATH_MAX, "%s --version", rsync);
1432         fp = popen(rsync_ver, "r");
1433         if (fp == NULL)
1434                 return -1;
1435
1436         if (fgets(rsync_ver, PATH_MAX, fp) == NULL) {
1437                 fclose(fp);
1438                 return -1;
1439         }
1440         len = strlen(rsync_ver);
1441         if (len > 0 && rsync_ver[len - 1] == '\n')
1442                 rsync_ver[len - 1] = '\0';
1443         fclose(fp);
1444
1445         return 0;
1446
1447 }
1448
1449 /* Print the replication parameters */
1450 void lr_print_status(struct lr_info *info)
1451 {
1452         int i;
1453
1454         if (!verbose)
1455                 return;
1456
1457         printf("Lustre filesystem: %s\n", status->ls_source_fs);
1458         printf("MDT device: %s\n", status->ls_mdt_device);
1459         printf("Source: %s\n", status->ls_source);
1460         for (i = 0; i < status->ls_num_targets; i++)
1461                 printf("Target: %s\n", status->ls_targets[i]);
1462         if (statuslog != NULL)
1463                 printf("Statuslog: %s\n", statuslog);
1464         printf("Changelog registration: %s\n", status->ls_registration);
1465         printf("Starting changelog record: %jd\n",
1466                (uintmax_t)status->ls_last_recno);
1467         if (noxattr)
1468                 printf("Replicate xattrs: no\n");
1469         if (noclear)
1470                 printf("Clear changelog after use: no\n");
1471         if (use_rsync)
1472                 printf("Using rsync: %s (%s)\n", rsync, rsync_ver);
1473 }
1474
1475 void lr_print_failure(struct lr_info *info, int rc)
1476 {
1477         fprintf(stderr, "Replication of operation failed(%d):"
1478                 " %lld %s (%d) %s %s %s\n", rc, info->recno,
1479                 changelog_type2str(info->type), info->type, info->tfid,
1480                 info->pfid, info->name);
1481 }
1482
1483 /* Replicate filesystem operations from src_path to target_path */
1484 int lr_replicate()
1485 {
1486         void *changelog_priv;
1487         struct lr_info *info;
1488         struct lr_info *ext = NULL;
1489         time_t start;
1490         int xattr_not_supp;
1491         int i;
1492         int rc;
1493
1494         start = time(NULL);
1495
1496         info = calloc(1, sizeof(struct lr_info));
1497         if (info == NULL)
1498                 return -ENOMEM;
1499
1500         rc = llapi_search_fsname(status->ls_source, status->ls_source_fs);
1501         if (rc) {
1502                 fprintf(stderr, "Source path is not a valid Lustre client "
1503                         "mountpoint.\n");
1504                 goto out;
1505         }
1506         if (status->ls_mdt_device[0] == '\0')
1507                 snprintf(status->ls_mdt_device, LR_NAME_MAXLEN, "%s%s",
1508                         status->ls_source_fs, DEFAULT_MDT);
1509
1510         ext = calloc(1, sizeof(struct lr_info));
1511         if (ext == NULL) {
1512                 rc = -ENOMEM;
1513                 goto out;
1514         }
1515
1516         for (i = 0, xattr_not_supp = 0; i < status->ls_num_targets; i++) {
1517                 snprintf(info->dest, PATH_MAX, "%s/%s", status->ls_targets[i],
1518                         SPECIAL_DIR);
1519                 rc = mkdir(info->dest, 0777);
1520                 if (rc == -1 && errno != EEXIST) {
1521                         fprintf(stderr, "Error writing to target path %s.\n",
1522                                 status->ls_targets[i]);
1523                         rc = -errno;
1524                         goto out;
1525                 }
1526                 rc = llistxattr(info->src, info->xlist, info->xsize);
1527                 if (rc == -1 && errno == ENOTSUP) {
1528                         fprintf(stderr, "xattrs not supported on %s\n",
1529                                 status->ls_targets[i]);
1530                         xattr_not_supp++;
1531                 }
1532         }
1533         if (xattr_not_supp == status->ls_num_targets)
1534                 /* None of the targets support xattrs. */
1535                 noxattr = 1;
1536
1537         lr_print_status(info);
1538
1539         /* Open changelogs for consumption*/
1540         rc = llapi_changelog_start(&changelog_priv,
1541                                 CHANGELOG_FLAG_BLOCK | CHANGELOG_FLAG_JOBID,
1542                                 status->ls_mdt_device, status->ls_last_recno);
1543         if (rc < 0) {
1544                 fprintf(stderr, "Error opening changelog file for fs %s.\n",
1545                         status->ls_source_fs);
1546                 goto out;
1547         }
1548
1549         while (!quit && lr_parse_line(changelog_priv, info) == 0) {
1550                 rc = 0;
1551                 if (info->type == CL_RENAME && !info->is_extended) {
1552                         /* Newer rename operations extends changelog to store
1553                          * source file information, but old changelog has
1554                          * another record.
1555                          */
1556                         if (lr_parse_line(changelog_priv, ext) != 0)
1557                                 break;
1558                         memcpy(info->sfid, info->tfid, sizeof(info->sfid));
1559                         memcpy(info->spfid, info->pfid, sizeof(info->spfid));
1560                         memcpy(info->tfid, ext->tfid, sizeof(info->tfid));
1561                         memcpy(info->pfid, ext->pfid, sizeof(info->pfid));
1562                         strlcpy(info->sname, info->name, sizeof(info->sname));
1563                         strlcpy(info->name, ext->name, sizeof(info->name));
1564                         info->is_extended = 1;
1565                 }
1566
1567                 if (dryrun)
1568                         continue;
1569
1570                 DEBUG_ENTRY(info);
1571
1572                 switch(info->type) {
1573                 case CL_CREATE:
1574                 case CL_MKDIR:
1575                 case CL_MKNOD:
1576                 case CL_SOFTLINK:
1577                         rc = lr_create(info);
1578                         break;
1579                 case CL_RMDIR:
1580                 case CL_UNLINK:
1581                         rc = lr_remove(info);
1582                         break;
1583                 case CL_RENAME:
1584                         rc = lr_move(info);
1585                         break;
1586                 case CL_HARDLINK:
1587                         rc = lr_link(info);
1588                         break;
1589                 case CL_TRUNC:
1590                 case CL_SETATTR:
1591                         rc = lr_setattr(info);
1592                         break;
1593                 case CL_XATTR:
1594                         rc = lr_setxattr(info);
1595                         break;
1596                 case CL_CLOSE:
1597                 case CL_EXT:
1598                 case CL_OPEN:
1599                 case CL_LAYOUT:
1600                 case CL_MARK:
1601                         /* Nothing needs to be done for these entries */
1602                         /* fallthrough */
1603                 default:
1604                         break;
1605                 }
1606                 DEBUG_EXIT(info, rc);
1607                 if (rc && rc != -ENOENT) {
1608                         lr_print_failure(info, rc);
1609                         errors++;
1610                         if (abort_on_err)
1611                                 break;
1612                 }
1613                 lr_clear_cl(info, 0);
1614                 if (debug) {
1615                         bzero(info, sizeof(struct lr_info));
1616                         bzero(ext, sizeof(struct lr_info));
1617                 }
1618         }
1619
1620         llapi_changelog_fini(&changelog_priv);
1621
1622         if (errors || verbose)
1623                 printf("Errors: %d\n", errors);
1624
1625         /* Clear changelog records used so far */
1626         lr_clear_cl(info, 1);
1627
1628         if (verbose) {
1629                 printf("lustre_rsync took %ld seconds\n", time(NULL) - start);
1630                 printf("Changelog records consumed: %lld\n", rec_count);
1631         }
1632
1633         rc = 0;
1634
1635 out:
1636         if (info != NULL)
1637                 free(info);
1638         if (ext != NULL)
1639                 free(ext);
1640
1641         return rc;
1642 }
1643
1644 void
1645 termination_handler (int signum)
1646 {
1647         /* Set a flag for the replicator to gracefully shutdown */
1648         quit = 1;
1649         printf("lustre_rsync halting.\n");
1650 }
1651
1652 int main(int argc, char *argv[])
1653 {
1654         int newsize;
1655         int numtargets = 0;
1656         int rc = 0;
1657
1658         if ((rc = lr_init_status()) != 0)
1659                 return rc;
1660
1661         while ((rc = getopt_long(argc, argv, "as:t:m:u:l:vx:zc:ry:n:d:D:",
1662                                  long_opts, NULL)) >= 0) {
1663                 switch (rc) {
1664                 case 'a':
1665                         /* Assume absolute paths */
1666                         abort_on_err++;
1667                         break;
1668                 case 's':
1669                         /* Assume absolute paths */
1670                         strlcpy(status->ls_source, optarg,
1671                                 sizeof(status->ls_source));
1672                         break;
1673                 case 't':
1674                         status->ls_num_targets++;
1675                         numtargets++;
1676                         if (numtargets != status->ls_num_targets) {
1677                                 /* Targets were read from a log
1678                                    file. The ones specified on the
1679                                    command line take precedence. The
1680                                    ones from the log file will be
1681                                    ignored. */
1682                                 status->ls_num_targets = numtargets;
1683                         }
1684                         newsize = sizeof (struct lustre_rsync_status) +
1685                                 (status->ls_num_targets * (PATH_MAX + 1));
1686                         if (status->ls_size != newsize) {
1687                                 status->ls_size = newsize;
1688                                 status = lr_grow_buf(status, newsize);
1689                                 if (status == NULL)
1690                                         return -ENOMEM;
1691                         }
1692                         strlcpy(status->ls_targets[status->ls_num_targets - 1],
1693                                 optarg, sizeof(status->ls_targets[0]));
1694                         break;
1695                 case 'm':
1696                         strlcpy(status->ls_mdt_device, optarg,
1697                                 sizeof(status->ls_mdt_device));
1698                         break;
1699                 case 'u':
1700                         strlcpy(status->ls_registration, optarg,
1701                                 sizeof(status->ls_registration));
1702                         break;
1703                 case 'l':
1704                         statuslog = optarg;
1705                         (void) lr_read_log();
1706                         break;
1707                 case 'v':
1708                         verbose++;
1709                         break;
1710                 case 'x':
1711                         if (strcmp("no", optarg) == 0) {
1712                                 noxattr = 1;
1713                         } else if (strcmp("yes", optarg) != 0) {
1714                                 printf("Invalid parameter %s. "
1715                                        "Specify --xattr=no or --xattr=yes\n",
1716                                        optarg);
1717                                 return -1;
1718                         }
1719                         break;
1720                 case 'z':
1721                         dryrun = 1;
1722                         break;
1723                 case 'c':
1724                         /* Undocumented option cl-clear */
1725                         if (strcmp("no", optarg) == 0) {
1726                                 noclear = 1;
1727                         } else if (strcmp("yes", optarg) != 0) {
1728                                 printf("Invalid parameter %s. "
1729                                        "Specify --cl-clear=no "
1730                                        "or --cl-clear=yes\n",
1731                                        optarg);
1732                                 return -1;
1733                         }
1734                         break;
1735                 case 'r':
1736                         /* Undocumented option use-rsync */
1737                         use_rsync = 1;
1738                         break;
1739                 case 'y':
1740                         /* Undocumented option rsync-threshold */
1741                         rsync_threshold = atol(optarg);
1742                         break;
1743                 case 'n':
1744                         /* Undocumented option start-recno */
1745                         status->ls_last_recno = atol(optarg);
1746                         break;
1747                 case 'd':
1748                         /* Undocumented option debug */
1749                         debug = atoi(optarg);
1750                         if (debug < 0 || debug > 2)
1751                                 debug = 0;
1752                         break;
1753                 case 'D':
1754                         /* Undocumented option debug log file */
1755                         if (debug_log != NULL)
1756                                 fclose(debug_log);
1757                         debug_log = fopen(optarg, "a");
1758                         if (debug_log == NULL) {
1759                                 printf("Cannot open %s for debug log\n",
1760                                        optarg);
1761                                 return -1;
1762                         }
1763                         break;
1764                 default:
1765                         fprintf(stderr, "error: %s: option '%s' "
1766                                 "unrecognized.\n", argv[0], argv[optind - 1]);
1767                         lr_usage();
1768                         return -1;
1769                 }
1770         }
1771
1772         if (status->ls_last_recno == -1)
1773                 status->ls_last_recno = 0;
1774         if (strnlen(status->ls_registration, LR_NAME_MAXLEN) == 0) {
1775                 /* No registration ID was passed in. */
1776                 printf("Please specify changelog consumer registration id.\n");
1777                 lr_usage();
1778                 return -1;
1779         }
1780         if (strnlen(status->ls_source, PATH_MAX) == 0) {
1781                 fprintf(stderr, "Please specify the source path.\n");
1782                 lr_usage();
1783                 return -1;
1784         }
1785         if (strnlen(status->ls_targets[0], PATH_MAX) == 0) {
1786                 fprintf(stderr, "Please specify the target path.\n");
1787                 lr_usage();
1788                 return -1;
1789         }
1790
1791         /* This plumbing is needed for some of the ioctls behind
1792            llapi calls to work. */
1793         if (obd_initialize(argc, argv) < 0) {
1794                 fprintf(stderr, "obd_initialize failed.\n");
1795                 exit(-1);
1796         }
1797
1798         rc = lr_locate_rsync();
1799         if (use_rsync && rc != 0) {
1800                 fprintf(stderr, "Error: unable to locate %s.\n", RSYNC);
1801                 exit(-1);
1802         }
1803
1804         signal(SIGINT, termination_handler);
1805         signal(SIGHUP, termination_handler);
1806         signal(SIGTERM, termination_handler);
1807
1808         rc = lr_replicate();
1809
1810         if (debug_log != NULL)
1811                 fclose(debug_log);
1812         return rc;
1813 }