Whamcloud - gitweb
LU-1302 llog: add env to llog_process functions
[fs/lustre-release.git] / lustre / obdclass / llog.c
index 658dbee..fdd4a33 100644 (file)
@@ -233,30 +233,27 @@ EXPORT_SYMBOL(llog_close);
 
 static int llog_process_thread(void *arg)
 {
-        struct llog_process_info     *lpi = (struct llog_process_info *)arg;
-        struct llog_handle           *loghandle = lpi->lpi_loghandle;
-        struct llog_log_hdr          *llh = loghandle->lgh_hdr;
-        struct llog_process_cat_data *cd  = lpi->lpi_catdata;
-        char                         *buf;
-        __u64                         cur_offset = LLOG_CHUNK_SIZE;
-        __u64                         last_offset;
-        int                           rc = 0, index = 1, last_index;
-        int                           saved_index = 0, last_called_index = 0;
+       struct llog_process_info        *lpi = arg;
+       struct llog_handle              *loghandle = lpi->lpi_loghandle;
+       struct llog_log_hdr             *llh = loghandle->lgh_hdr;
+       struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
+       char                            *buf;
+       __u64                            cur_offset = LLOG_CHUNK_SIZE;
+       __u64                            last_offset;
+       int                              rc = 0, index = 1, last_index;
+       int                              saved_index = 0;
+       int                              last_called_index = 0;
+
+       ENTRY;
 
         LASSERT(llh);
 
         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
         if (!buf) {
                 lpi->lpi_rc = -ENOMEM;
-#ifdef __KERNEL__
-                cfs_complete(&lpi->lpi_completion);
-#endif
-                return 0;
+               RETURN(0);
         }
 
-        if (!(lpi->lpi_flags & LLOG_FLAG_NODEAMON))
-                cfs_daemonize_ctxt("llog_process_thread");
-
         if (cd != NULL) {
                 last_called_index = cd->lpcd_first_idx;
                 index = cd->lpcd_first_idx + 1;
@@ -277,7 +274,7 @@ static int llog_process_thread(void *arg)
                 LASSERT(index <= last_index + 1);
                 if (index == last_index + 1)
                         break;
-
+repeat:
                 CDEBUG(D_OTHER, "index: %d last_index %d\n",
                        index, last_index);
 
@@ -300,15 +297,19 @@ static int llog_process_thread(void *arg)
                                rec, rec->lrh_type);
 
                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
-                                lustre_swab_llog_rec(rec, NULL);
+                               lustre_swab_llog_rec(rec);
 
                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
                                rec->lrh_type, rec->lrh_index);
 
-                        if (rec->lrh_index == 0)
-                                GOTO(out, 0); /* no more records */
-
-                        if (rec->lrh_len == 0 || rec->lrh_len >LLOG_CHUNK_SIZE){
+                       if (rec->lrh_index == 0) {
+                               /* probably another rec just got added? */
+                               if (index <= loghandle->lgh_last_idx)
+                                       GOTO(repeat, rc = 0);
+                               GOTO(out, rc = 0); /* no more records */
+                       }
+                       if (rec->lrh_len == 0 ||
+                           rec->lrh_len > LLOG_CHUNK_SIZE) {
                                 CWARN("invalid length %d in llog record for "
                                       "index %d/%d\n", rec->lrh_len,
                                       rec->lrh_index, index);
@@ -355,23 +356,46 @@ static int llog_process_thread(void *arg)
                 }
         }
 
- out:
+out:
         if (cd != NULL)
                 cd->lpcd_last_idx = last_called_index;
-        if (buf)
-                OBD_FREE(buf, LLOG_CHUNK_SIZE);
+
+       OBD_FREE(buf, LLOG_CHUNK_SIZE);
         lpi->lpi_rc = rc;
-#ifdef __KERNEL__
-        cfs_complete(&lpi->lpi_completion);
-#endif
         return 0;
 }
 
-int llog_process_flags(struct llog_handle *loghandle, llog_cb_t cb,
-                       void *data, void *catdata, int flags)
+#ifdef __KERNEL__
+static int llog_process_thread_daemonize(void *arg)
+{
+       struct llog_process_info        *lpi = arg;
+       struct lu_env                    env;
+       int                              rc;
+
+       cfs_daemonize_ctxt("llog_process_thread");
+
+       /* client env has no keys, tags is just 0 */
+       rc = lu_env_init(&env, LCT_LOCAL);
+       if (rc)
+               goto out;
+       lpi->lpi_env = &env;
+
+       rc = llog_process_thread(arg);
+
+       lu_env_fini(&env);
+out:
+       cfs_complete(&lpi->lpi_completion);
+       return rc;
+}
+#endif
+
+int llog_process_or_fork(const struct lu_env *env,
+                        struct llog_handle *loghandle,
+                        llog_cb_t cb, void *data, void *catdata, bool fork)
 {
         struct llog_process_info *lpi;
         int                      rc;
+
         ENTRY;
 
         OBD_ALLOC_PTR(lpi);
@@ -383,30 +407,39 @@ int llog_process_flags(struct llog_handle *loghandle, llog_cb_t cb,
         lpi->lpi_cb        = cb;
         lpi->lpi_cbdata    = data;
         lpi->lpi_catdata   = catdata;
-        lpi->lpi_flags     = flags;
 
 #ifdef __KERNEL__
-        cfs_init_completion(&lpi->lpi_completion);
-        rc = cfs_create_thread(llog_process_thread, lpi, CFS_DAEMON_FLAGS);
-        if (rc < 0) {
-                CERROR("cannot start thread: %d\n", rc);
-                OBD_FREE_PTR(lpi);
-                RETURN(rc);
-        }
-        cfs_wait_for_completion(&lpi->lpi_completion);
+       if (fork) {
+               /* The new thread can't use parent env,
+                * init the new one in llog_process_thread_daemonize. */
+               lpi->lpi_env = NULL;
+               cfs_init_completion(&lpi->lpi_completion);
+               rc = cfs_create_thread(llog_process_thread_daemonize, lpi,
+                                      CFS_DAEMON_FLAGS);
+               if (rc < 0) {
+                       CERROR("%s: cannot start thread: rc = %d\n",
+                              loghandle->lgh_ctxt->loc_obd->obd_name, rc);
+                       OBD_FREE_PTR(lpi);
+                       RETURN(rc);
+               }
+               cfs_wait_for_completion(&lpi->lpi_completion);
+       } else {
+               lpi->lpi_env = env;
+               llog_process_thread(lpi);
+       }
 #else
-        llog_process_thread(lpi);
+       lpi->lpi_env = env;
+       llog_process_thread(lpi);
 #endif
         rc = lpi->lpi_rc;
         OBD_FREE_PTR(lpi);
         RETURN(rc);
 }
-EXPORT_SYMBOL(llog_process_flags);
 
-int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
-                 void *data, void *catdata)
+int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
+                llog_cb_t cb, void *data, void *catdata)
 {
-        return llog_process_flags(loghandle, cb, data, catdata, 0);
+       return llog_process_or_fork(env, loghandle, cb, data, catdata, false);
 }
 EXPORT_SYMBOL(llog_process);
 
@@ -418,8 +451,9 @@ inline int llog_get_size(struct llog_handle *loghandle)
 }
 EXPORT_SYMBOL(llog_get_size);
 
-int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
-                         void *data, void *catdata)
+int llog_reverse_process(const struct lu_env *env,
+                        struct llog_handle *loghandle, llog_cb_t cb,
+                        void *data, void *catdata)
 {
         struct llog_log_hdr *llh = loghandle->lgh_hdr;
         struct llog_process_cat_data *cd = catdata;
@@ -458,29 +492,35 @@ int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
                         GOTO(out, rc);
 
                 rec = buf;
-                idx = le32_to_cpu(rec->lrh_index);
-                if (idx < index)
-                        CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
+               idx = rec->lrh_index;
+               CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
                 while (idx < index) {
-                        rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+                       rec = (void *)rec + rec->lrh_len;
+                       if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
+                               lustre_swab_llog_rec(rec);
                         idx ++;
                 }
-                tail = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*tail);
+               LASSERT(idx == index);
+               tail = (void *)rec + rec->lrh_len - sizeof(*tail);
 
                 /* process records in buffer, starting where we found one */
                 while ((void *)tail > buf) {
-                        rec = (void *)tail - le32_to_cpu(tail->lrt_len) +
-                                sizeof(*tail);
-
-                        if (rec->lrh_index == 0)
-                                GOTO(out, 0); /* no more records */
+                       if (tail->lrt_index == 0)
+                               GOTO(out, rc = 0); /* no more records */
 
                         /* if set, process the callback on this record */
                         if (ext2_test_bit(index, llh->llh_bitmap)) {
+                               rec = (void *)tail - tail->lrt_len +
+                                     sizeof(*tail);
+
                                 rc = cb(loghandle, rec, data);
                                 if (rc == LLOG_PROC_BREAK) {
                                         GOTO(out, rc);
-                                }
+                               } else if (rc == LLOG_DEL_RECORD) {
+                                       llog_cancel_rec(loghandle,
+                                                       tail->lrt_index);
+                                       rc = 0;
+                               }
                                 if (rc)
                                         GOTO(out, rc);
                         }
@@ -489,7 +529,7 @@ int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
                         --index;
                         if (index < first_index)
                                 GOTO(out, rc = 0);
-                        tail = (void *)rec - sizeof(*tail);
+                       tail = (void *)tail - tail->lrt_len;
                 }
         }