Whamcloud - gitweb
LU-1302 llog: modify llog_write/llog_add to support OSD
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
index 50172ff..a3c5873 100644 (file)
@@ -26,6 +26,8 @@
 /*
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Intel, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -40,6 +42,8 @@
  *   if an OST or MDS fails it need only look at log(s) relevant to itself
  *
  * Author: Andreas Dilger <adilger@clusterfs.com>
+ * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
+ * Author: Mikhail Pershin <mike.pershin@intel.com>
  */
 
 #define DEBUG_SUBSYSTEM S_LOG
  *
  * Assumes caller has already pushed us into the kernel context and is locking.
  */
-static struct llog_handle *llog_cat_new_log(const struct lu_env *env,
-                                           struct llog_handle *cathandle)
+static int llog_cat_new_log(const struct lu_env *env,
+                           struct llog_handle *cathandle,
+                           struct llog_handle *loghandle,
+                           struct thandle *th)
 {
-        struct llog_handle *loghandle;
+
         struct llog_log_hdr *llh;
         struct llog_logid_rec rec = { { 0 }, };
         int rc, index, bitmap_size;
@@ -74,16 +80,21 @@ static struct llog_handle *llog_cat_new_log(const struct lu_env *env,
         /* maximum number of available slots in catlog is bitmap_size - 2 */
         if (llh->llh_cat_idx == index) {
                 CERROR("no free catalog slots for log...\n");
-                RETURN(ERR_PTR(-ENOSPC));
-        }
+               RETURN(-ENOSPC);
+       }
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
-                RETURN(ERR_PTR(-ENOSPC));
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
+               RETURN(-ENOSPC);
 
-       rc = llog_open_create(env, cathandle->lgh_ctxt, &loghandle, NULL,
-                             NULL);
-       if (rc)
-               RETURN(ERR_PTR(rc));
+       rc = llog_create(env, loghandle, th);
+       /* if llog is already created, no need to initialize it */
+       if (rc == -EEXIST) {
+               RETURN(0);
+       } else if (rc != 0) {
+               CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
+                      loghandle->lgh_ctxt->loc_obd->obd_name, rc);
+               RETURN(rc);
+       }
 
        rc = llog_init_handle(env, loghandle,
                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
@@ -120,22 +131,15 @@ static struct llog_handle *llog_cat_new_log(const struct lu_env *env,
 
         /* update the catalog: header and record */
        rc = llog_write_rec(env, cathandle, &rec.lid_hdr,
-                            &loghandle->u.phd.phd_cookie, 1, NULL, index);
-        if (rc < 0) {
-                GOTO(out_destroy, rc);
-        }
-
-        loghandle->lgh_hdr->llh_cat_idx = index;
-        cathandle->u.chd.chd_current_log = loghandle;
-        LASSERT(cfs_list_empty(&loghandle->u.phd.phd_entry));
-        cfs_list_add_tail(&loghandle->u.phd.phd_entry,
-                          &cathandle->u.chd.chd_head);
-
-out_destroy:
+                           &loghandle->u.phd.phd_cookie, 1, NULL, index, th);
        if (rc < 0)
-               llog_destroy(env, loghandle);
+               GOTO(out_destroy, rc);
 
-       RETURN(loghandle);
+       loghandle->lgh_hdr->llh_cat_idx = index;
+       RETURN(0);
+out_destroy:
+       llog_destroy(env, loghandle);
+       RETURN(rc);
 }
 
 /* Open an existent log handle and add it to the open list.
@@ -155,6 +159,7 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
        if (cathandle == NULL)
                RETURN(-EBADF);
 
+       cfs_down_write(&cathandle->lgh_lock);
        cfs_list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
                                u.phd.phd_entry) {
                struct llog_logid *cgl = &loghandle->lgh_id;
@@ -168,9 +173,11 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                                continue;
                        }
                        loghandle->u.phd.phd_cat_handle = cathandle;
+                       cfs_up_write(&cathandle->lgh_lock);
                        GOTO(out, rc = 0);
                }
        }
+       cfs_up_write(&cathandle->lgh_lock);
 
        rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, logid, NULL,
                       LLOG_OPEN_EXISTS);
@@ -187,11 +194,14 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                GOTO(out, rc);
        }
 
+       cfs_down_write(&cathandle->lgh_lock);
        cfs_list_add(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
+       cfs_up_write(&cathandle->lgh_lock);
 
        loghandle->u.phd.phd_cat_handle = cathandle;
        loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
-       loghandle->u.phd.phd_cookie.lgc_index = loghandle->lgh_hdr->llh_cat_idx;
+       loghandle->u.phd.phd_cookie.lgc_index =
+                               loghandle->lgh_hdr->llh_cat_idx;
        EXIT;
 out:
        *res = loghandle;
@@ -262,9 +272,8 @@ enum {
  *
  * NOTE: loghandle is write-locked upon successful return
  */
-static struct llog_handle *llog_cat_current_log(const struct lu_env *env,
-                                               struct llog_handle *cathandle,
-                                               int create)
+static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
+                                               struct thandle *th)
 {
         struct llog_handle *loghandle = NULL;
         ENTRY;
@@ -272,33 +281,31 @@ static struct llog_handle *llog_cat_current_log(const struct lu_env *env,
         cfs_down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
         loghandle = cathandle->u.chd.chd_current_log;
         if (loghandle) {
-                struct llog_log_hdr *llh = loghandle->lgh_hdr;
+               struct llog_log_hdr *llh;
 
-                cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
-                if (loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
+               cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
+               llh = loghandle->lgh_hdr;
+               if (llh == NULL ||
+                   loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
                         cfs_up_read(&cathandle->lgh_lock);
                         RETURN(loghandle);
                 } else {
                         cfs_up_write(&loghandle->lgh_lock);
                 }
         }
-        if (!create) {
-                if (loghandle)
-                        cfs_down_write(&loghandle->lgh_lock);
-                cfs_up_read(&cathandle->lgh_lock);
-                RETURN(loghandle);
-        }
         cfs_up_read(&cathandle->lgh_lock);
 
-        /* time to create new log */
+       /* time to use next log */
 
-        /* first, we have to make sure the state hasn't changed */
-        cfs_down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
-        loghandle = cathandle->u.chd.chd_current_log;
-        if (loghandle) {
-                struct llog_log_hdr *llh = loghandle->lgh_hdr;
+       /* first, we have to make sure the state hasn't changed */
+       cfs_down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
+       loghandle = cathandle->u.chd.chd_current_log;
+       if (loghandle) {
+               struct llog_log_hdr *llh;
 
-                cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
+               cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
+               llh = loghandle->lgh_hdr;
+               LASSERT(llh);
                 if (loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
                         cfs_up_write(&cathandle->lgh_lock);
                         RETURN(loghandle);
@@ -307,12 +314,15 @@ static struct llog_handle *llog_cat_current_log(const struct lu_env *env,
                 }
         }
 
-        CDEBUG(D_INODE, "creating new log\n");
-       loghandle = llog_cat_new_log(env, cathandle);
-        if (!IS_ERR(loghandle))
-                cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
-        cfs_up_write(&cathandle->lgh_lock);
-        RETURN(loghandle);
+       CDEBUG(D_INODE, "use next log\n");
+
+       loghandle = cathandle->u.chd.chd_next_log;
+       cathandle->u.chd.chd_current_log = loghandle;
+       cathandle->u.chd.chd_next_log = NULL;
+       cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
+       cfs_up_write(&cathandle->lgh_lock);
+       LASSERT(loghandle);
+       RETURN(loghandle);
 }
 
 /* Add a single record to the recovery log(s) using a catalog
@@ -322,35 +332,160 @@ static struct llog_handle *llog_cat_current_log(const struct lu_env *env,
  */
 int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
                     struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
-                    void *buf)
+                    void *buf, struct thandle *th)
 {
         struct llog_handle *loghandle;
         int rc;
         ENTRY;
 
         LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
-       loghandle = llog_cat_current_log(env, cathandle, 1);
-       if (IS_ERR(loghandle))
-               RETURN(PTR_ERR(loghandle));
+       loghandle = llog_cat_current_log(cathandle, th);
+       LASSERT(!IS_ERR(loghandle));
+
        /* loghandle is already locked by llog_cat_current_log() for us */
-       rc = llog_write_rec(env, loghandle, rec, reccookie, 1, buf, -1);
+       if (!llog_exist(loghandle)) {
+               rc = llog_cat_new_log(env, cathandle, loghandle, th);
+               if (rc < 0) {
+                       cfs_up_write(&loghandle->lgh_lock);
+                       RETURN(rc);
+               }
+       }
+       /* now let's try to add the record */
+       rc = llog_write_rec(env, loghandle, rec, reccookie, 1, buf, -1, th);
         if (rc < 0)
                 CERROR("llog_write_rec %d: lh=%p\n", rc, loghandle);
         cfs_up_write(&loghandle->lgh_lock);
         if (rc == -ENOSPC) {
-                /* to create a new plain log */
-               loghandle = llog_cat_current_log(env, cathandle, 1);
-               if (IS_ERR(loghandle))
-                       RETURN(PTR_ERR(loghandle));
+               /* try to use next log */
+               loghandle = llog_cat_current_log(cathandle, th);
+               LASSERT(!IS_ERR(loghandle));
+               /* new llog can be created concurrently */
+               if (!llog_exist(loghandle)) {
+                       rc = llog_cat_new_log(env, cathandle, loghandle, th);
+                       if (rc < 0) {
+                               cfs_up_write(&loghandle->lgh_lock);
+                               RETURN(rc);
+                       }
+               }
+               /* now let's try to add the record */
                rc = llog_write_rec(env, loghandle, rec, reccookie, 1, buf,
-                                   -1);
-                cfs_up_write(&loghandle->lgh_lock);
-        }
+                                   -1, th);
+               if (rc < 0)
+                       CERROR("llog_write_rec %d: lh=%p\n", rc, loghandle);
+               cfs_up_write(&loghandle->lgh_lock);
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cat_add_rec);
 
+int llog_cat_declare_add_rec(const struct lu_env *env,
+                            struct llog_handle *cathandle,
+                            struct llog_rec_hdr *rec, struct thandle *th)
+{
+       struct llog_handle      *loghandle, *next;
+       int                      rc = 0;
+
+       ENTRY;
+
+       if (cathandle->u.chd.chd_current_log == NULL) {
+               /* declare new plain llog */
+               cfs_down_write(&cathandle->lgh_lock);
+               if (cathandle->u.chd.chd_current_log == NULL) {
+                       rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
+                                      NULL, NULL, LLOG_OPEN_NEW);
+                       if (rc == 0) {
+                               cathandle->u.chd.chd_current_log = loghandle;
+                               cfs_list_add_tail(&loghandle->u.phd.phd_entry,
+                                                 &cathandle->u.chd.chd_head);
+                       }
+               }
+               cfs_up_write(&cathandle->lgh_lock);
+       } else if (cathandle->u.chd.chd_next_log == NULL) {
+               /* declare next plain llog */
+               cfs_down_write(&cathandle->lgh_lock);
+               if (cathandle->u.chd.chd_next_log == NULL) {
+                       rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
+                                      NULL, NULL, LLOG_OPEN_NEW);
+                       if (rc == 0) {
+                               cathandle->u.chd.chd_next_log = loghandle;
+                               cfs_list_add_tail(&loghandle->u.phd.phd_entry,
+                                                 &cathandle->u.chd.chd_head);
+                       }
+               }
+               cfs_up_write(&cathandle->lgh_lock);
+       }
+       if (rc)
+               GOTO(out, rc);
+
+       if (!llog_exist(cathandle->u.chd.chd_current_log)) {
+               rc = llog_declare_create(env, cathandle->u.chd.chd_current_log,
+                                        th);
+               if (rc)
+                       GOTO(out, rc);
+               llog_declare_write_rec(env, cathandle, NULL, -1, th);
+       }
+       /* declare records in the llogs */
+       rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
+                                   rec, -1, th);
+       if (rc)
+               GOTO(out, rc);
+
+       next = cathandle->u.chd.chd_next_log;
+       if (next) {
+               if (!llog_exist(next)) {
+                       rc = llog_declare_create(env, next, th);
+                       llog_declare_write_rec(env, cathandle, NULL, -1, th);
+               }
+               llog_declare_write_rec(env, next, rec, -1, th);
+       }
+out:
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_cat_declare_add_rec);
+
+int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle,
+                struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
+                void *buf)
+{
+       struct llog_ctxt        *ctxt;
+       struct dt_device        *dt;
+       struct thandle          *th = NULL;
+       int                      rc;
+
+       ctxt = cathandle->lgh_ctxt;
+       LASSERT(ctxt);
+       LASSERT(ctxt->loc_exp);
+
+       if (cathandle->lgh_obj != NULL) {
+               dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
+               LASSERT(dt);
+
+               th = dt_trans_create(env, dt);
+               if (IS_ERR(th))
+                       RETURN(PTR_ERR(th));
+
+               rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
+               if (rc)
+                       GOTO(out_trans, rc);
+
+               rc = dt_trans_start_local(env, dt, th);
+               if (rc)
+                       GOTO(out_trans, rc);
+               rc = llog_cat_add_rec(env, cathandle, rec, reccookie, buf, th);
+out_trans:
+               dt_trans_stop(env, dt, th);
+       } else { /* lvfs compat code */
+               LASSERT(cathandle->lgh_file != NULL);
+               rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
+               if (rc == 0)
+                       rc = llog_cat_add_rec(env, cathandle, rec, reccookie,
+                                             buf, th);
+       }
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_cat_add);
+
 /* For each cookie in the cookie array, we clear the log in-use bit and either:
  * - the log is empty, so mark it free in the catalog header and delete it
  * - the log is not empty, just write out the log header
@@ -364,14 +499,14 @@ int llog_cat_cancel_records(const struct lu_env *env,
                            struct llog_handle *cathandle, int count,
                            struct llog_cookie *cookies)
 {
-       int i, index, rc = 0;
+       int i, index, rc = 0, failed = 0;
 
        ENTRY;
 
-       cfs_down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
        for (i = 0; i < count; i++, cookies++) {
                struct llog_handle      *loghandle;
                struct llog_logid       *lgl = &cookies->lgc_lgl;
+               int                      lrc;
 
                rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
                if (rc) {
@@ -379,28 +514,36 @@ int llog_cat_cancel_records(const struct lu_env *env,
                        break;
                }
 
-               cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
-               rc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
-               cfs_up_write(&loghandle->lgh_lock);
-
-               if (rc == 1) {          /* log has been destroyed */
+               lrc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
+               if (lrc == 1) {          /* log has been destroyed */
                        index = loghandle->u.phd.phd_cookie.lgc_index;
+                       cfs_down_write(&cathandle->lgh_lock);
                        if (cathandle->u.chd.chd_current_log == loghandle)
                                cathandle->u.chd.chd_current_log = NULL;
+                       cfs_up_write(&cathandle->lgh_lock);
                        llog_close(env, loghandle);
 
                        LASSERT(index);
                        llog_cat_set_first_idx(cathandle, index);
-                       rc = llog_cancel_rec(env, cathandle, index);
-                        if (rc == 0)
-                                CDEBUG(D_RPCTRACE,"cancel plain log at index %u"
-                                       " of catalog "LPX64"\n",
-                                       index, cathandle->lgh_id.lgl_oid);
-                }
-        }
-        cfs_up_write(&cathandle->lgh_lock);
+                       lrc = llog_cancel_rec(env, cathandle, index);
+                       if (lrc == 0)
+                               CDEBUG(D_RPCTRACE, "cancel plain log at index"
+                                      " %u of catalog "LPX64"\n",
+                                      index, cathandle->lgh_id.lgl_oid);
+               } else if (lrc == -ENOENT) {
+                       if (rc == 0) /* ENOENT shouldn't rewrite any error */
+                               rc = lrc;
+               } else if (lrc < 0) {
+                       failed++;
+                       rc = lrc;
+               }
+       }
+       if (rc)
+               CERROR("%s: fail to cancel %d of %d llog-records: rc = %d\n",
+                      cathandle->lgh_ctxt->loc_obd->obd_name, failed, count,
+                      rc);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cat_cancel_records);