Whamcloud - gitweb
LU-6714 llog: fix wrong offset in llog_process_thread()
[fs/lustre-release.git] / lustre / obdclass / llog.c
index 2f3b9e4..56413ef 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2013, Intel Corporation.
+ * Copyright (c) 2012, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -47,7 +47,7 @@
 
 #define DEBUG_SUBSYSTEM S_LOG
 
-
+#include <linux/kthread.h>
 #include <obd_class.h>
 #include <lustre_log.h>
 #include "llog_internal.h"
@@ -56,7 +56,7 @@
  * Allocate a new log or catalog handle
  * Used inside llog_open().
  */
-struct llog_handle *llog_alloc_handle(void)
+static struct llog_handle *llog_alloc_handle(void)
 {
        struct llog_handle *loghandle;
 
@@ -65,7 +65,7 @@ struct llog_handle *llog_alloc_handle(void)
                return NULL;
 
        init_rwsem(&loghandle->lgh_lock);
-       spin_lock_init(&loghandle->lgh_hdr_lock);
+       init_rwsem(&loghandle->lgh_hdr_lock);
        INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
        atomic_set(&loghandle->lgh_refcount, 1);
 
@@ -75,20 +75,19 @@ struct llog_handle *llog_alloc_handle(void)
 /*
  * Free llog handle and header data if exists. Used in llog_close() only
  */
-void llog_free_handle(struct llog_handle *loghandle)
+static void llog_free_handle(struct llog_handle *loghandle)
 {
        LASSERT(loghandle != NULL);
 
        /* failed llog_init_handle */
-       if (!loghandle->lgh_hdr)
+       if (loghandle->lgh_hdr == NULL)
                goto out;
 
        if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
                LASSERT(list_empty(&loghandle->u.phd.phd_entry));
        else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
                LASSERT(list_empty(&loghandle->u.chd.chd_head));
-       LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
-       OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
+       OBD_FREE_LARGE(loghandle->lgh_hdr, loghandle->lgh_hdr_size);
 out:
        OBD_FREE_PTR(loghandle);
 }
@@ -121,9 +120,9 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
                 RETURN(-EINVAL);
         }
 
-       spin_lock(&loghandle->lgh_hdr_lock);
-       if (!ext2_clear_bit(index, llh->llh_bitmap)) {
-               spin_unlock(&loghandle->lgh_hdr_lock);
+       down_write(&loghandle->lgh_hdr_lock);
+       if (!ext2_clear_bit(index, LLOG_HDR_BITMAP(llh))) {
+               up_write(&loghandle->lgh_hdr_lock);
                CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
                RETURN(-ENOENT);
        }
@@ -132,8 +131,8 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
 
        if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
            (llh->llh_count == 1) &&
-           (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
-               spin_unlock(&loghandle->lgh_hdr_lock);
+           (loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1)) {
+               up_write(&loghandle->lgh_hdr_lock);
                rc = llog_destroy(env, loghandle);
                if (rc < 0) {
                        CERROR("%s: can't destroy empty llog #"DOSTID
@@ -145,7 +144,7 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
                }
                RETURN(LLOG_DEL_PLAIN);
        }
-       spin_unlock(&loghandle->lgh_hdr_lock);
+       up_write(&loghandle->lgh_hdr_lock);
 
        rc = llog_write(env, loghandle, &llh->llh_hdr, LLOG_HEADER_IDX);
        if (rc < 0) {
@@ -158,13 +157,12 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
        }
        RETURN(0);
 out_err:
-       spin_lock(&loghandle->lgh_hdr_lock);
-       ext2_set_bit(index, llh->llh_bitmap);
+       down_write(&loghandle->lgh_hdr_lock);
+       ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
        llh->llh_count++;
-       spin_unlock(&loghandle->lgh_hdr_lock);
+       up_write(&loghandle->lgh_hdr_lock);
        return rc;
 }
-EXPORT_SYMBOL(llog_cancel_rec);
 
 static int llog_read_header(const struct lu_env *env,
                            struct llog_handle *handle,
@@ -184,17 +182,22 @@ static int llog_read_header(const struct lu_env *env,
        if (rc == LLOG_EEMPTY) {
                struct llog_log_hdr *llh = handle->lgh_hdr;
 
+               /* lrh_len should be initialized in llog_init_handle */
                handle->lgh_last_idx = 0; /* header is record with index 0 */
                llh->llh_count = 1;         /* for the header record */
                llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
-               llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
-               llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
+               LASSERT(handle->lgh_ctxt->loc_chunk_size >=
+                                               LLOG_MIN_CHUNK_SIZE);
+               llh->llh_hdr.lrh_len = handle->lgh_ctxt->loc_chunk_size;
+               llh->llh_hdr.lrh_index = 0;
                llh->llh_timestamp = cfs_time_current_sec();
                if (uuid)
                        memcpy(&llh->llh_tgtuuid, uuid,
                               sizeof(llh->llh_tgtuuid));
                llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
-               ext2_set_bit(0, llh->llh_bitmap);
+               ext2_set_bit(0, LLOG_HDR_BITMAP(llh));
+               LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
+               LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
                rc = 0;
        }
        return rc;
@@ -206,14 +209,18 @@ int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
        struct llog_log_hdr     *llh;
        enum llog_flag           fmt = flags & LLOG_F_EXT_MASK;
        int                      rc;
-
+       int                     chunk_size = handle->lgh_ctxt->loc_chunk_size;
        ENTRY;
+
        LASSERT(handle->lgh_hdr == NULL);
 
-       OBD_ALLOC_PTR(llh);
+       LASSERT(chunk_size >= LLOG_MIN_CHUNK_SIZE);
+       OBD_ALLOC_LARGE(llh, chunk_size);
        if (llh == NULL)
                RETURN(-ENOMEM);
+
        handle->lgh_hdr = llh;
+       handle->lgh_hdr_size = chunk_size;
        /* first assign flags to use llog_client_ops */
        llh->llh_flags = flags;
        rc = llog_read_header(env, handle, uuid);
@@ -262,7 +269,7 @@ int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
        llh->llh_flags |= fmt;
 out:
        if (rc) {
-               OBD_FREE_PTR(llh);
+               OBD_FREE_LARGE(llh, chunk_size);
                handle->lgh_hdr = NULL;
        }
        RETURN(rc);
@@ -276,106 +283,141 @@ static int llog_process_thread(void *arg)
        struct llog_log_hdr             *llh = loghandle->lgh_hdr;
        struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
        char                            *buf;
-       __u64                            cur_offset = LLOG_CHUNK_SIZE;
-       __u64                            last_offset;
+       size_t                           chunk_size;
+       __u64                            cur_offset;
        int                              rc = 0, index = 1, last_index;
        int                              saved_index = 0;
        int                              last_called_index = 0;
 
        ENTRY;
 
-        LASSERT(llh);
+       if (llh == NULL)
+               RETURN(-EINVAL);
 
-        OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
-        if (!buf) {
-                lpi->lpi_rc = -ENOMEM;
-               RETURN(0);
-        }
+       cur_offset = chunk_size = llh->llh_hdr.lrh_len;
+       /* expect chunk_size to be power of two */
+       LASSERT(is_power_of_2(chunk_size));
 
-        if (cd != NULL) {
-                last_called_index = cd->lpcd_first_idx;
-                index = cd->lpcd_first_idx + 1;
-        }
-        if (cd != NULL && cd->lpcd_last_idx)
-                last_index = cd->lpcd_last_idx;
-        else
-                last_index = LLOG_BITMAP_BYTES * 8 - 1;
+       OBD_ALLOC_LARGE(buf, chunk_size);
+       if (buf == NULL) {
+               lpi->lpi_rc = -ENOMEM;
+               RETURN(0);
+       }
 
-       if (index > last_index) {
-               /* Record is not in this buffer. */
-               GOTO(out, rc);
+       if (cd != NULL) {
+               last_called_index = cd->lpcd_first_idx;
+               index = cd->lpcd_first_idx + 1;
        }
+       if (cd != NULL && cd->lpcd_last_idx)
+               last_index = cd->lpcd_last_idx;
+       else
+               last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
 
-        while (rc == 0) {
-                struct llog_rec_hdr *rec;
+       while (rc == 0) {
+               struct llog_rec_hdr *rec;
+               off_t chunk_offset;
+               unsigned int buf_offset = 0;
+               bool partial_chunk;
 
-                /* skip records not set in bitmap */
-                while (index <= last_index &&
-                       !ext2_test_bit(index, llh->llh_bitmap))
-                        ++index;
+               /* skip records not set in bitmap */
+               while (index <= last_index &&
+                      !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
+                       ++index;
 
-                LASSERT(index <= last_index + 1);
-                if (index == last_index + 1)
-                        break;
-repeat:
-                CDEBUG(D_OTHER, "index: %d last_index %d\n",
-                       index, last_index);
+               /* There are no indices prior the last_index */
+               if (index > last_index)
+                       break;
 
-                /* get the buf with our target record; avoid old garbage */
-                memset(buf, 0, LLOG_CHUNK_SIZE);
-                last_offset = cur_offset;
+               CDEBUG(D_OTHER, "index: %d last_index %d\n", index,
+                      last_index);
+
+repeat:
+               /* get the buf with our target record; avoid old garbage */
+               memset(buf, 0, chunk_size);
                rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
-                                    index, &cur_offset, buf, LLOG_CHUNK_SIZE);
-                if (rc)
-                        GOTO(out, rc);
+                                    index, &cur_offset, buf, chunk_size);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               /* NB: after llog_next_block() call the cur_offset is the
+                * offset of the next block after read one.
+                * The absolute offset of the current chunk is calculated
+                * from cur_offset value and stored in chunk_offset variable.
+                */
+               if (cur_offset % chunk_size != 0) {
+                       partial_chunk = true;
+                       chunk_offset = cur_offset & ~(chunk_size - 1);
+               } else {
+                       partial_chunk = false;
+                       chunk_offset = cur_offset - chunk_size;
+               }
 
                /* NB: when rec->lrh_len is accessed it is already swabbed
                 * since it is used at the "end" of the loop and the rec
                 * swabbing is done at the beginning of the loop. */
-               for (rec = (struct llog_rec_hdr *)buf;
-                    (char *)rec < buf + LLOG_CHUNK_SIZE;
+               for (rec = (struct llog_rec_hdr *)(buf + buf_offset);
+                    (char *)rec < buf + chunk_size;
                     rec = llog_rec_hdr_next(rec)) {
 
                        CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
                               rec, rec->lrh_type);
 
-                        if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
+                       if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
                                lustre_swab_llog_rec(rec);
 
-                        CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
-                               rec->lrh_type, rec->lrh_index);
+                       CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
+                              rec->lrh_type, rec->lrh_index);
+
+                       /* for partial chunk the end of it is zeroed, check
+                        * for index 0 to distinguish it. */
+                       if (partial_chunk && rec->lrh_index == 0) {
+                               /* concurrent llog_add() might add new records
+                                * while llog_processing, check this is not
+                                * the case and re-read the current chunk
+                                * otherwise. */
+                               if (index > loghandle->lgh_last_idx)
+                                       GOTO(out, rc = 0);
+                               CDEBUG(D_OTHER, "Re-read last llog buffer for "
+                                      "new records, index %u, last %u\n",
+                                      index, loghandle->lgh_last_idx);
+                               /* save offset inside buffer for the re-read */
+                               buf_offset = (char *)rec - (char *)buf;
+                               cur_offset = chunk_offset;
+                               goto repeat;
+                       }
 
-                       if (rec->lrh_index == 0) {
-                               /* probably another rec just got added? */
-                               if (index <= loghandle->lgh_last_idx)
-                                       GOTO(repeat, rc = 0);
-                               GOTO(out, rc = 0); /* no more records */
+                       if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
+                               CWARN("invalid length %d in llog record for "
+                                     "index %d/%d\n", rec->lrh_len,
+                                     rec->lrh_index, index);
+                               GOTO(out, rc = -EINVAL);
                        }
-                       if (rec->lrh_len == 0 ||
-                           rec->lrh_len > LLOG_CHUNK_SIZE) {
-                                CWARN("invalid length %d in llog record for "
-                                      "index %d/%d\n", rec->lrh_len,
-                                      rec->lrh_index, index);
-                                GOTO(out, rc = -EINVAL);
-                        }
 
-                        if (rec->lrh_index < index) {
-                                CDEBUG(D_OTHER, "skipping lrh_index %d\n",
-                                       rec->lrh_index);
-                                continue;
-                        }
+                       if (rec->lrh_index < index) {
+                               CDEBUG(D_OTHER, "skipping lrh_index %d\n",
+                                      rec->lrh_index);
+                               continue;
+                       }
+
+                       if (rec->lrh_index != index) {
+                               CERROR("%s: Invalid record: index %u but "
+                                      "expected %u\n",
+                                      loghandle->lgh_ctxt->loc_obd->obd_name,
+                                      rec->lrh_index, index);
+                               GOTO(out, rc = -ERANGE);
+                       }
 
-                        CDEBUG(D_OTHER,
-                               "lrh_index: %d lrh_len: %d (%d remains)\n",
-                               rec->lrh_index, rec->lrh_len,
-                               (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
+                       CDEBUG(D_OTHER,
+                              "lrh_index: %d lrh_len: %d (%d remains)\n",
+                              rec->lrh_index, rec->lrh_len,
+                              (int)(buf + chunk_size - (char *)rec));
 
-                        loghandle->lgh_cur_idx = rec->lrh_index;
-                        loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
-                                                    last_offset;
+                       loghandle->lgh_cur_idx = rec->lrh_index;
+                       loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
+                                                   chunk_offset;
 
-                        /* if set, process the callback on this record */
-                        if (ext2_test_bit(index, llh->llh_bitmap)) {
+                       /* if set, process the callback on this record */
+                       if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
                                rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
                                                 lpi->lpi_cbdata);
                                last_called_index = index;
@@ -385,23 +427,20 @@ repeat:
                                        rc = llog_cancel_rec(lpi->lpi_env,
                                                             loghandle,
                                                             rec->lrh_index);
-                                }
-                                if (rc)
-                                        GOTO(out, rc);
-                        } else {
-                                CDEBUG(D_OTHER, "Skipped index %d\n", index);
-                        }
-
-                        /* next record, still in buffer? */
-                        ++index;
-                        if (index > last_index)
-                                GOTO(out, rc = 0);
-                }
-        }
+                               }
+                               if (rc)
+                                       GOTO(out, rc);
+                       }
+                       /* exit if the last index is reached */
+                       if (index >= last_index)
+                               GOTO(out, rc = 0);
+                       ++index;
+               }
+       }
 
 out:
-        if (cd != NULL)
-                cd->lpcd_last_idx = last_called_index;
+       if (cd != NULL)
+               cd->lpcd_last_idx = last_called_index;
 
        if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) {
                /* something bad happened to the processing of a local
@@ -410,14 +449,14 @@ out:
                 * remaining bits in the header */
                CERROR("Local llog found corrupted\n");
                while (index <= last_index) {
-                       if (ext2_test_bit(index, llh->llh_bitmap) != 0)
+                       if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh)) != 0)
                                llog_cancel_rec(lpi->lpi_env, loghandle, index);
                        index++;
                }
                rc = 0;
        }
 
-       OBD_FREE(buf, LLOG_CHUNK_SIZE);
+       OBD_FREE_LARGE(buf, chunk_size);
         lpi->lpi_rc = rc;
         return 0;
 }
@@ -508,36 +547,36 @@ int llog_reverse_process(const struct lu_env *env,
         struct llog_process_cat_data *cd = catdata;
         void *buf;
         int rc = 0, first_index = 1, index, idx;
+       __u32   chunk_size = llh->llh_hdr.lrh_len;
         ENTRY;
 
-        OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
-        if (!buf)
-                RETURN(-ENOMEM);
-
-        if (cd != NULL)
-                first_index = cd->lpcd_first_idx + 1;
-        if (cd != NULL && cd->lpcd_last_idx)
-                index = cd->lpcd_last_idx;
-        else
-                index = LLOG_BITMAP_BYTES * 8 - 1;
-
-        while (rc == 0) {
-                struct llog_rec_hdr *rec;
-                struct llog_rec_tail *tail;
-
-                /* skip records not set in bitmap */
-                while (index >= first_index &&
-                       !ext2_test_bit(index, llh->llh_bitmap))
-                        --index;
-
-                LASSERT(index >= first_index - 1);
-                if (index == first_index - 1)
-                        break;
+       OBD_ALLOC_LARGE(buf, chunk_size);
+       if (buf == NULL)
+               RETURN(-ENOMEM);
 
-                /* get the buf with our target record; avoid old garbage */
-                memset(buf, 0, LLOG_CHUNK_SIZE);
-               rc = llog_prev_block(env, loghandle, index, buf,
-                                    LLOG_CHUNK_SIZE);
+       if (cd != NULL)
+               first_index = cd->lpcd_first_idx + 1;
+       if (cd != NULL && cd->lpcd_last_idx)
+               index = cd->lpcd_last_idx;
+       else
+               index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
+
+       while (rc == 0) {
+               struct llog_rec_hdr *rec;
+               struct llog_rec_tail *tail;
+
+               /* skip records not set in bitmap */
+               while (index >= first_index &&
+                      !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
+                       --index;
+
+               LASSERT(index >= first_index - 1);
+               if (index == first_index - 1)
+                       break;
+
+               /* get the buf with our target record; avoid old garbage */
+               memset(buf, 0, chunk_size);
+               rc = llog_prev_block(env, loghandle, index, buf, chunk_size);
                if (rc)
                        GOTO(out, rc);
 
@@ -553,13 +592,13 @@ int llog_reverse_process(const struct lu_env *env,
                LASSERT(idx == index);
                tail = (void *)rec + rec->lrh_len - sizeof(*tail);
 
-                /* process records in buffer, starting where we found one */
-                while ((void *)tail > buf) {
+               /* process records in buffer, starting where we found one */
+               while ((void *)tail > buf) {
                        if (tail->lrt_index == 0)
                                GOTO(out, rc = 0); /* no more records */
 
-                        /* if set, process the callback on this record */
-                        if (ext2_test_bit(index, llh->llh_bitmap)) {
+                       /* if set, process the callback on this record */
+                       if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
                                rec = (void *)tail - tail->lrt_len +
                                      sizeof(*tail);
 
@@ -583,8 +622,8 @@ int llog_reverse_process(const struct lu_env *env,
         }
 
 out:
-        if (buf)
-                OBD_FREE(buf, LLOG_CHUNK_SIZE);
+       if (buf != NULL)
+               OBD_FREE_LARGE(buf, chunk_size);
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_reverse_process);
@@ -643,7 +682,6 @@ int llog_declare_create(const struct lu_env *env,
                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
        RETURN(rc);
 }
-EXPORT_SYMBOL(llog_declare_create);
 
 int llog_create(const struct lu_env *env, struct llog_handle *handle,
                struct thandle *th)
@@ -667,7 +705,6 @@ int llog_create(const struct lu_env *env, struct llog_handle *handle,
                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
        RETURN(rc);
 }
-EXPORT_SYMBOL(llog_create);
 
 int llog_declare_write_rec(const struct lu_env *env,
                           struct llog_handle *handle,
@@ -694,7 +731,6 @@ int llog_declare_write_rec(const struct lu_env *env,
                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
        RETURN(rc);
 }
-EXPORT_SYMBOL(llog_declare_write_rec);
 
 int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
                   struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
@@ -724,7 +760,6 @@ int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
        RETURN(rc);
 }
-EXPORT_SYMBOL(llog_write_rec);
 
 int llog_add(const struct lu_env *env, struct llog_handle *lgh,
             struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
@@ -796,6 +831,14 @@ int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
        if (IS_ERR(th))
                GOTO(out, rc = PTR_ERR(th));
 
+       /* Create the remote update llog object synchronously, which
+        * happens during inialization process see lod_sub_prep_llog(),
+        * to make sure the update llog object is created before
+        * corss-MDT writing updates into the llog object */
+       if (dt_object_remote((*res)->lgh_obj))
+               th->th_sync = 1;
+
+       th->th_wait_submit = 1;
        rc = llog_declare_create(env, *res, th);
        if (rc == 0) {
                rc = dt_trans_start_local(env, d, th);
@@ -868,6 +911,7 @@ int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
        if (rc)
                GOTO(out_trans, rc);
 
+       th->th_wait_submit = 1;
        rc = dt_trans_start_local(env, dt, th);
        if (rc)
                GOTO(out_trans, rc);
@@ -983,7 +1027,6 @@ int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh,
        /* Append all records */
        return llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
 }
-EXPORT_SYMBOL(llog_copy_handler);
 
 /* backup plain llog */
 int llog_backup(const struct lu_env *env, struct obd_device *obd,