Whamcloud - gitweb
LU-1302 llog: introduce llog_open
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
index 3bbb801..50172ff 100644 (file)
@@ -49,7 +49,6 @@
 #endif
 
 #include <obd_class.h>
-#include <lustre_log.h>
 
 #include "llog_internal.h"
 
@@ -81,7 +80,8 @@ static struct llog_handle *llog_cat_new_log(const struct lu_env *env,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
                 RETURN(ERR_PTR(-ENOSPC));
 
-       rc = llog_create(env, cathandle->lgh_ctxt, &loghandle, NULL, NULL);
+       rc = llog_open_create(env, cathandle->lgh_ctxt, &loghandle, NULL,
+                             NULL);
        if (rc)
                RETURN(ERR_PTR(rc));
 
@@ -147,53 +147,58 @@ out_destroy:
 int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                       struct llog_handle **res, struct llog_logid *logid)
 {
-        struct llog_handle *loghandle;
-        int rc = 0;
-        ENTRY;
-
-        if (cathandle == NULL)
-                RETURN(-EBADF);
+       struct llog_handle      *loghandle;
+       int                      rc = 0;
 
-        cfs_list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
-                                u.phd.phd_entry) {
-                struct llog_logid *cgl = &loghandle->lgh_id;
+       ENTRY;
 
-                if (cgl->lgl_oid == logid->lgl_oid) {
-                        if (cgl->lgl_ogen != logid->lgl_ogen) {
-                                CERROR("log "LPX64" generation %x != %x\n",
-                                       logid->lgl_oid, cgl->lgl_ogen,
-                                       logid->lgl_ogen);
-                                continue;
-                        }
-                        loghandle->u.phd.phd_cat_handle = cathandle;
-                        GOTO(out, rc = 0);
-                }
-        }
+       if (cathandle == NULL)
+               RETURN(-EBADF);
+
+       cfs_list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
+                               u.phd.phd_entry) {
+               struct llog_logid *cgl = &loghandle->lgh_id;
+
+               if (cgl->lgl_oid == logid->lgl_oid) {
+                       if (cgl->lgl_ogen != logid->lgl_ogen) {
+                               CERROR("%s: log "LPX64" generation %x != %x\n",
+                                      loghandle->lgh_ctxt->loc_obd->obd_name,
+                                      logid->lgl_oid, cgl->lgl_ogen,
+                                      logid->lgl_ogen);
+                               continue;
+                       }
+                       loghandle->u.phd.phd_cat_handle = cathandle;
+                       GOTO(out, rc = 0);
+               }
+       }
 
-       rc = llog_create(env, cathandle->lgh_ctxt, &loghandle, logid, NULL);
-       if (rc) {
-               CERROR("error opening log id "LPX64":%x: rc %d\n",
+       rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, logid, NULL,
+                      LLOG_OPEN_EXISTS);
+       if (rc < 0) {
+               CERROR("%s: error opening log id "LPX64":%x: rc = %d\n",
+                      cathandle->lgh_ctxt->loc_obd->obd_name,
                       logid->lgl_oid, logid->lgl_ogen, rc);
-       } else {
-               rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
-                if (!rc) {
-                        cfs_list_add(&loghandle->u.phd.phd_entry,
-                                     &cathandle->u.chd.chd_head);
-                }
-        }
-        if (!rc) {
-                loghandle->u.phd.phd_cat_handle = cathandle;
-                loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
-                loghandle->u.phd.phd_cookie.lgc_index =
-                        loghandle->lgh_hdr->llh_cat_idx;
-        }
+               GOTO(out, rc);
+       }
 
+       rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
+       if (rc < 0) {
+               llog_close(env, loghandle);
+               GOTO(out, rc);
+       }
+
+       cfs_list_add(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
+
+       loghandle->u.phd.phd_cat_handle = cathandle;
+       loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
+       loghandle->u.phd.phd_cookie.lgc_index = loghandle->lgh_hdr->llh_cat_idx;
+       EXIT;
 out:
-        *res = loghandle;
-        RETURN(rc);
+       *res = loghandle;
+       return rc;
 }
 
-int llog_cat_put(const struct lu_env *env, struct llog_handle *cathandle)
+int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
 {
        struct llog_handle      *loghandle, *n;
        int                      rc;
@@ -202,14 +207,42 @@ int llog_cat_put(const struct lu_env *env, struct llog_handle *cathandle)
 
        cfs_list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
                                     u.phd.phd_entry) {
-               int err = llog_close(env, loghandle);
-               if (err)
-                       CERROR("error closing loghandle\n");
+               struct llog_log_hdr     *llh = loghandle->lgh_hdr;
+               int                      index;
+
+               /* unlink open-not-created llogs */
+               cfs_list_del_init(&loghandle->u.phd.phd_entry);
+               llh = loghandle->lgh_hdr;
+               if (loghandle->lgh_obj != NULL && llh != NULL &&
+                   (llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
+                   (llh->llh_count == 1)) {
+                       rc = llog_destroy(env, loghandle);
+                       if (rc)
+                               CERROR("%s: failure destroying log during "
+                                      "cleanup: rc = %d\n",
+                                      loghandle->lgh_ctxt->loc_obd->obd_name,
+                                      rc);
+
+                       index = loghandle->u.phd.phd_cookie.lgc_index;
+
+                       LASSERT(index);
+                       llog_cat_set_first_idx(cathandle, index);
+                       rc = llog_cancel_rec(env, cathandle, index);
+                       if (rc == 0)
+                               CDEBUG(D_RPCTRACE,
+                                      "cancel plain log at index %u of "
+                                      "catalog "LPX64"\n",
+                                      index, cathandle->lgh_id.lgl_oid);
+               }
+               llog_close(env, loghandle);
        }
+       /* if handle was stored in ctxt, remove it too */
+       if (cathandle->lgh_ctxt->loc_handle == cathandle)
+               cathandle->lgh_ctxt->loc_handle = NULL;
        rc = llog_close(env, cathandle);
        RETURN(rc);
 }
-EXPORT_SYMBOL(llog_cat_put);
+EXPORT_SYMBOL(llog_cat_close);
 
 /**
  * lockdep markers for nested struct llog_handle::lgh_lock locking.
@@ -331,13 +364,14 @@ int llog_cat_cancel_records(const struct lu_env *env,
                            struct llog_handle *cathandle, int count,
                            struct llog_cookie *cookies)
 {
-        int i, index, rc = 0;
-        ENTRY;
+       int i, index, rc = 0;
 
-        cfs_down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
-        for (i = 0; i < count; i++, cookies++) {
-                struct llog_handle *loghandle;
-                struct llog_logid *lgl = &cookies->lgc_lgl;
+       ENTRY;
+
+       cfs_down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
+       for (i = 0; i < count; i++, cookies++) {
+               struct llog_handle      *loghandle;
+               struct llog_logid       *lgl = &cookies->lgc_lgl;
 
                rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
                if (rc) {
@@ -345,18 +379,18 @@ int llog_cat_cancel_records(const struct lu_env *env,
                        break;
                }
 
-                cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
+               cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
                rc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
-                cfs_up_write(&loghandle->lgh_lock);
+               cfs_up_write(&loghandle->lgh_lock);
 
-                if (rc == 1) {          /* log has been destroyed */
-                        index = loghandle->u.phd.phd_cookie.lgc_index;
-                        if (cathandle->u.chd.chd_current_log == loghandle)
-                                cathandle->u.chd.chd_current_log = NULL;
-                        llog_free_handle(loghandle);
+               if (rc == 1) {          /* log has been destroyed */
+                       index = loghandle->u.phd.phd_cookie.lgc_index;
+                       if (cathandle->u.chd.chd_current_log == loghandle)
+                               cathandle->u.chd.chd_current_log = NULL;
+                       llog_close(env, loghandle);
 
-                        LASSERT(index);
-                        llog_cat_set_first_idx(cathandle, index);
+                       LASSERT(index);
+                       llog_cat_set_first_idx(cathandle, index);
                        rc = llog_cancel_rec(env, cathandle, index);
                         if (rc == 0)
                                 CDEBUG(D_RPCTRACE,"cancel plain log at index %u"
@@ -403,11 +437,13 @@ int llog_cat_process_cb(const struct lu_env *env, struct llog_handle *cat_llh,
 
                 cd.lpcd_first_idx = d->lpd_startidx;
                 cd.lpcd_last_idx = 0;
-               rc = llog_process(env, llh, d->lpd_cb, d->lpd_data, &cd);
+               rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
+                                         &cd, false);
                 /* Continue processing the next log from idx 0 */
                 d->lpd_startidx = 0;
         } else {
-               rc = llog_process(env, llh, d->lpd_cb, d->lpd_data, NULL);
+               rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
+                                         NULL, false);
         }
 
         RETURN(rc);
@@ -484,41 +520,47 @@ int llog_cat_process_thread(void *data)
        LASSERT(lgi);
 
        lgi->lgi_logid = *(struct llog_logid *)(args->lpca_arg);
-       rc = llog_create(&env, ctxt, &llh, &lgi->lgi_logid, NULL);
-        if (rc) {
-                CERROR("llog_create() failed %d\n", rc);
+       rc = llog_open(&env, ctxt, &llh, &lgi->lgi_logid, NULL,
+                      LLOG_OPEN_EXISTS);
+       if (rc) {
+               CERROR("%s: cannot open llog "LPX64":%x: rc = %d\n",
+                      ctxt->loc_obd->obd_name, lgi->lgi_logid.lgl_oid,
+                      lgi->lgi_logid.lgl_ogen, rc);
                GOTO(out_env, rc);
-        }
+       }
        rc = llog_init_handle(&env, llh, LLOG_F_IS_CAT, NULL);
-        if (rc) {
-                CERROR("llog_init_handle failed %d\n", rc);
-                GOTO(release_llh, rc);
-        }
+       if (rc) {
+               CERROR("%s: llog_init_handle failed: rc = %d\n",
+                      llh->lgh_ctxt->loc_obd->obd_name, rc);
+               GOTO(release_llh, rc);
+       }
 
-        if (cb) {
+       if (cb) {
                rc = llog_cat_process(&env, llh, cb, NULL, 0, 0);
                if (rc != LLOG_PROC_BREAK && rc != 0)
-                       CERROR("llog_cat_process() failed %d\n", rc);
+                       CERROR("%s: llog_cat_process() failed: rc = %d\n",
+                              llh->lgh_ctxt->loc_obd->obd_name, rc);
                cb(&env, llh, NULL, NULL);
-        } else {
-                CWARN("No callback function for recovery\n");
-        }
+       } else {
+               CWARN("No callback function for recovery\n");
+       }
 
-        /*
-         * Make sure that all cached data is sent.
-         */
+       /*
+        * Make sure that all cached data is sent.
+        */
        llog_sync(ctxt, NULL, 0);
-        GOTO(release_llh, rc);
+       GOTO(release_llh, rc);
 release_llh:
-       rc = llog_cat_put(&env, llh);
-        if (rc)
-                CERROR("llog_cat_put() failed %d\n", rc);
+       rc = llog_cat_close(&env, llh);
+       if (rc)
+               CERROR("%s: llog_cat_close() failed: rc = %d\n",
+                      llh->lgh_ctxt->loc_obd->obd_name, rc);
 out_env:
        lu_env_fini(&env);
 out:
-        llog_ctxt_put(ctxt);
-        OBD_FREE_PTR(args);
-        return rc;
+       llog_ctxt_put(ctxt);
+       OBD_FREE_PTR(args);
+       return rc;
 }
 EXPORT_SYMBOL(llog_cat_process_thread);
 #endif
@@ -665,7 +707,7 @@ int cat_cancel_cb(const struct lu_env *env, struct llog_handle *cathandle,
                               loghandle->lgh_ctxt->loc_obd->obd_name, rc);
 
                index = loghandle->u.phd.phd_cookie.lgc_index;
-               llog_free_handle(loghandle);
+               llog_close(env, loghandle);
 
 cat_cleanup:
                LASSERT(index);