Whamcloud - gitweb
LU-1302 llog: modify llog_write/llog_add to support OSD
authorMikhail Pershin <tappro@whamcloud.com>
Mon, 10 Sep 2012 13:06:32 +0000 (17:06 +0400)
committerOleg Drokin <green@whamcloud.com>
Mon, 24 Sep 2012 19:46:15 +0000 (15:46 -0400)
- add transaction and declarations in API to work over OSD
- change logic in llog catalog to open plain llogs during
  declarations
- keep lop_obd_add to work via chain of obds, e.g. MDS-LOV-OSC.
  It will be not needed with LOD/OSP anymore

Signed-off-by: Mikhail Pershin <tappro@whamcloud.com>
Change-Id: Ifba546fcb5fe8434fd4aa21a21b831737df73729
Reviewed-on: http://review.whamcloud.com/3923
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
20 files changed:
lustre/include/linux/lvfs.h
lustre/include/lustre_log.h
lustre/lov/lov_log.c
lustre/mdd/mdd_device.c
lustre/mdd/mdd_lov.c
lustre/mds/mds_log.c
lustre/mgc/mgc_request.c
lustre/mgs/mgs_llog.c
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_ioctl.c
lustre/obdclass/llog_lvfs.c
lustre/obdclass/llog_obd.c
lustre/obdclass/llog_test.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_log.c
lustre/osc/osc_request.c
lustre/ptlrpc/llog_net.c
lustre/ptlrpc/llog_server.c
lustre/ptlrpc/sec_config.c

index f949552..e2866a5 100644 (file)
@@ -79,6 +79,7 @@ struct lvfs_run_ctxt {
         int                      ngroups;
         struct lvfs_callback_ops cb_ops;
         struct group_info       *group_info;
+       struct dt_device        *dt;
 #ifdef OBD_CTXT_DEBUG
         __u32                    magic;
 #endif
index e0124ac..ebd5591 100644 (file)
@@ -187,9 +187,9 @@ struct llog_process_cat_args {
 int cat_cancel_cb(const struct lu_env *env, struct llog_handle *cathandle,
                  struct llog_rec_hdr *rec, void *data);
 int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle);
-int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
-                    struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
-                    void *buf);
+int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle,
+                struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
+                void *buf);
 int llog_cat_cancel_records(const struct lu_env *env,
                            struct llog_handle *cathandle, int count,
                            struct llog_cookie *cookies);
@@ -215,9 +215,9 @@ int llog_setup(struct obd_device *obd, struct obd_llog_group *olg, int index,
 int __llog_ctxt_put(struct llog_ctxt *ctxt);
 int llog_cleanup(struct llog_ctxt *);
 int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp, int flags);
-int llog_add(const struct lu_env *env, struct llog_ctxt *ctxt,
-            struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
-            struct llog_cookie *logcookies, int numcookies);
+int llog_obd_add(const struct lu_env *env, struct llog_ctxt *ctxt,
+                struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
+                struct llog_cookie *logcookies, int numcookies);
 int llog_cancel(const struct lu_env *env, struct llog_ctxt *ctxt,
                struct lov_stripe_md *lsm, int count,
                struct llog_cookie *cookies, int flags);
@@ -311,18 +311,30 @@ struct llog_operations {
         * write new record in llog. It appends records usually but can edit
         * existing records too.
         */
+       int (*lop_declare_write_rec)(const struct lu_env *env,
+                                    struct llog_handle *lgh,
+                                    struct llog_rec_hdr *rec,
+                                    int idx, struct thandle *th);
        int (*lop_write_rec)(const struct lu_env *env,
                             struct llog_handle *loghandle,
                             struct llog_rec_hdr *rec,
-                            struct llog_cookie *logcookies, int numcookies,
-                            void *buf, int idx);
+                            struct llog_cookie *cookie, int cookiecount,
+                            void *buf, int idx, struct thandle *th);
        /**
         * Add new record in llog catalog. Does the same as llog_write_rec()
         * but using llog catalog.
         */
-       int (*lop_add)(const struct lu_env *env, struct llog_ctxt *ctxt,
-                      struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
-                      struct llog_cookie *logcookies, int numcookies);
+       int (*lop_declare_add)(const struct lu_env *env,
+                              struct llog_handle *lgh,
+                              struct llog_rec_hdr *rec, struct thandle *th);
+       int (*lop_add)(const struct lu_env *env, struct llog_handle *lgh,
+                      struct llog_rec_hdr *rec, struct llog_cookie *cookie,
+                      void *buf, struct thandle *th);
+       /* Old llog_add version, used in MDS-LOV-OSC now and will gone with
+        * LOD/OSP replacement */
+       int (*lop_obd_add)(const struct lu_env *env, struct llog_ctxt *ctxt,
+                          struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
+                          struct llog_cookie *logcookies, int numcookies);
 };
 
 /* In-memory descriptor for a log object or log catalog */
@@ -606,42 +618,6 @@ static inline int llog_ctxt_null(struct obd_device *obd, int index)
         return (llog_group_ctxt_null(&obd->obd_olg, index));
 }
 
-static inline int llog_write_rec(const struct lu_env *env,
-                                struct llog_handle *handle,
-                                struct llog_rec_hdr *rec,
-                                struct llog_cookie *logcookies,
-                                int numcookies, void *buf, int idx)
-{
-       struct llog_operations *lop;
-       int raised, rc, buflen;
-
-       ENTRY;
-
-       rc = llog_handle2ops(handle, &lop);
-       if (rc)
-               RETURN(rc);
-       LASSERT(lop);
-       if (lop->lop_write_rec == NULL)
-               RETURN(-EOPNOTSUPP);
-
-       /* FIXME:  Why doesn't caller just set the right lrh_len itself? */
-       if (buf)
-               buflen = rec->lrh_len + sizeof(struct llog_rec_hdr) +
-                        sizeof(struct llog_rec_tail);
-       else
-               buflen = rec->lrh_len;
-       LASSERT(cfs_size_round(buflen) == buflen);
-
-       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
-       if (!raised)
-               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
-       rc = lop->lop_write_rec(env, handle, rec, logcookies, numcookies,
-                               buf, idx);
-       if (!raised)
-               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
-       RETURN(rc);
-}
-
 static inline int llog_destroy(const struct lu_env *env,
                               struct llog_handle *handle)
 {
@@ -719,88 +695,28 @@ static inline int llog_connect(struct llog_ctxt *ctxt,
        RETURN(rc);
 }
 
-/**
- * new llog API
- *
- * API functions:
- *      llog_open - open llog, may not exist
- *      llog_exist - check if llog exists
- *      llog_close - close opened llog, pair for open, frees llog_handle
- *      llog_declare_create - declare llog creation
- *      llog_create - create new llog on disk, need transaction handle
- */
-static inline int llog_exist(struct llog_handle *loghandle)
-{
-       struct llog_operations *lop;
-       int raised, rc;
-
-       ENTRY;
-
-       rc = llog_handle2ops(loghandle, &lop);
-       if (rc)
-               RETURN(rc);
-       if (lop->lop_exist == NULL)
-               RETURN(-EOPNOTSUPP);
-       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
-       if (!raised)
-               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
-       rc = lop->lop_exist(loghandle);
-       if (!raised)
-               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
-       RETURN(rc);
-}
-
-static inline int llog_declare_create(const struct lu_env *env,
-                                     struct llog_handle *loghandle,
-                                     struct thandle *th)
-{
-       struct llog_operations  *lop;
-       int                      raised, rc;
-
-       ENTRY;
-
-       rc = llog_handle2ops(loghandle, &lop);
-       if (rc)
-               RETURN(rc);
-       if (lop->lop_declare_create == NULL)
-               RETURN(-EOPNOTSUPP);
-
-       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
-       if (!raised)
-               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
-       rc = lop->lop_declare_create(env, loghandle, th);
-       if (!raised)
-               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
-       RETURN(rc);
-}
-
-static inline int llog_create(const struct lu_env *env,
-                             struct llog_handle *handle, struct thandle *th)
-{
-       struct llog_operations  *lop;
-       int                      raised, rc;
-
-       ENTRY;
-
-       rc = llog_handle2ops(handle, &lop);
-       if (rc)
-               RETURN(rc);
-       if (lop->lop_create == NULL)
-               RETURN(-EOPNOTSUPP);
-
-       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
-       if (!raised)
-               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
-       rc = lop->lop_create(env, handle, th);
-       if (!raised)
-               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
-       RETURN(rc);
-}
-
+/* llog.c */
+int llog_exist(struct llog_handle *loghandle);
+int llog_declare_create(const struct lu_env *env,
+                       struct llog_handle *loghandle, struct thandle *th);
+int llog_create(const struct lu_env *env, struct llog_handle *handle,
+               struct thandle *th);
+int llog_declare_write_rec(const struct lu_env *env,
+                          struct llog_handle *handle,
+                          struct llog_rec_hdr *rec, int idx,
+                          struct thandle *th);
+int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
+                  struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
+                  int numcookies, void *buf, int idx, struct thandle *th);
+int llog_add(const struct lu_env *env, struct llog_handle *lgh,
+            struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
+            void *buf, struct thandle *th);
+int llog_declare_add(const struct lu_env *env, struct llog_handle *lgh,
+                    struct llog_rec_hdr *rec, struct thandle *th);
 int lustre_process_log(struct super_block *sb, char *logname,
-                       struct config_llog_instance *cfg);
+                      struct config_llog_instance *cfg);
 int lustre_end_log(struct super_block *sb, char *logname,
-                   struct config_llog_instance *cfg);
+                  struct config_llog_instance *cfg);
 int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
                     struct llog_handle **res, struct llog_logid *logid,
                     char *name);
index 0f02e55..a12cf2b 100644 (file)
@@ -103,13 +103,13 @@ static int lov_llog_origin_add(const struct lu_env *env,
                         break;
                 }
 
-                /* inject error in llog_add() below */
-                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FAIL_LOV_LOG_ADD)) {
-                        llog_ctxt_put(cctxt);
-                        cctxt = NULL;
-                }
-               rc = llog_add(env, cctxt, rec, NULL, logcookies + cookies,
-                             numcookies - cookies);
+               /* inject error in llog_obd_add() below */
+               if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FAIL_LOV_LOG_ADD)) {
+                       llog_ctxt_put(cctxt);
+                       cctxt = NULL;
+               }
+               rc = llog_obd_add(env, cctxt, rec, NULL, logcookies + cookies,
+                                 numcookies - cookies);
                 llog_ctxt_put(cctxt);
                 if (rc < 0) {
                         CERROR("Can't add llog (rc = %d) for stripe %d\n",
@@ -118,7 +118,7 @@ static int lov_llog_origin_add(const struct lu_env *env,
                                sizeof(struct llog_cookie));
                         rc = 1; /* skip this cookie */
                 }
-                /* Note that rc is always 1 if llog_add was successful */
+               /* Note that rc is always 1 if llog_obd_add was successful */
                 cookies += rc;
         }
         RETURN(cookies);
@@ -200,12 +200,12 @@ static int lov_llog_repl_cancel(const struct lu_env *env,
 }
 
 static struct llog_operations lov_mds_ost_orig_logops = {
-        lop_add: lov_llog_origin_add,
-        lop_connect: lov_llog_origin_connect
+       .lop_obd_add    = lov_llog_origin_add,
+       .lop_connect    = lov_llog_origin_connect,
 };
 
 static struct llog_operations lov_size_repl_logops = {
-        lop_cancel: lov_llog_repl_cancel
+       .lop_cancel     = lov_llog_repl_cancel,
 };
 
 int lov_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
index 35f80f4..92d75bf 100644 (file)
@@ -326,7 +326,7 @@ int mdd_changelog_llog_write(struct mdd_device         *mdd,
                 return -ENXIO;
 
         /* nested journal transaction */
-       rc = llog_add(NULL, ctxt, &rec->cr_hdr, NULL, NULL, 0);
+       rc = llog_obd_add(NULL, ctxt, &rec->cr_hdr, NULL, NULL, 0);
         llog_ctxt_put(ctxt);
 
         return rc;
@@ -362,7 +362,7 @@ int mdd_changelog_ext_llog_write(struct mdd_device *mdd,
                return -ENXIO;
 
        /* nested journal transaction */
-       rc = llog_add(NULL, ctxt, &rec->cr_hdr, NULL, NULL, 0);
+       rc = llog_obd_add(NULL, ctxt, &rec->cr_hdr, NULL, NULL, 0);
        llog_ctxt_put(ctxt);
 
        return rc;
@@ -1388,7 +1388,7 @@ static int mdd_changelog_user_register(struct mdd_device *mdd, int *id)
         rec->cur_endrec = mdd->mdd_cl.mc_index;
         cfs_spin_unlock(&mdd->mdd_cl.mc_user_lock);
 
-       rc = llog_add(NULL, ctxt, &rec->cur_hdr, NULL, NULL, 0);
+       rc = llog_obd_add(NULL, ctxt, &rec->cur_hdr, NULL, NULL, 0);
 
         CDEBUG(D_IOCTL, "Registered changelog user %d\n", *id);
 out:
@@ -1516,7 +1516,7 @@ stop:
         /* hdr+1 is loc of data */
         hdr->lrh_len -= sizeof(*hdr) + sizeof(struct llog_rec_tail);
        rc = llog_write_rec(env, llh, hdr, NULL, 0, (void *)(hdr + 1),
-                           hdr->lrh_index);
+                           hdr->lrh_index, NULL);
 
         RETURN(rc);
 }
index 00c8f38..defe810 100644 (file)
@@ -789,8 +789,8 @@ int mdd_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid,
 
         /* write setattr log */
         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
-       rc = llog_add(NULL, ctxt, &lsr->lsr_hdr, lsm, logcookies,
-                     cookies_size / sizeof(struct llog_cookie));
+       rc = llog_obd_add(NULL, ctxt, &lsr->lsr_hdr, lsm, logcookies,
+                         cookies_size / sizeof(struct llog_cookie));
 
         llog_ctxt_put(ctxt);
 
index eceaa9b..1b7e5ad 100644 (file)
@@ -66,7 +66,7 @@ static int mds_llog_origin_add(const struct lu_env *env,
         ENTRY;
 
         lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
-       rc = llog_add(env, lctxt, rec, lsm, logcookies, numcookies);
+       rc = llog_obd_add(env, lctxt, rec, lsm, logcookies, numcookies);
         llog_ctxt_put(lctxt);
 
         RETURN(rc);
@@ -90,8 +90,8 @@ static int mds_llog_origin_connect(struct llog_ctxt *ctxt,
 }
 
 static struct llog_operations mds_ost_orig_logops = {
-        lop_add:        mds_llog_origin_add,
-        lop_connect:    mds_llog_origin_connect,
+       .lop_obd_add    = mds_llog_origin_add,
+       .lop_connect    = mds_llog_origin_connect,
 };
 
 static int mds_llog_repl_cancel(const struct lu_env *env,
@@ -204,7 +204,7 @@ int mds_changelog_llog_init(struct obd_device *obd, struct obd_device *tgt)
         changelog_orig_logops = llog_lvfs_ops;
         changelog_orig_logops.lop_setup = llog_obd_origin_setup;
         changelog_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
-        changelog_orig_logops.lop_add = llog_obd_origin_add;
+       changelog_orig_logops.lop_obd_add = llog_obd_origin_add;
         changelog_orig_logops.lop_cancel = llog_changelog_cancel;
 
         rc = llog_setup_named(obd, &obd->obd_olg, LLOG_CHANGELOG_ORIG_CTXT,
@@ -315,7 +315,7 @@ static int mds_llog_add_unlink(struct obd_device *obd,
         lur->lur_count = count;
 
         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
-       rc = llog_add(NULL, ctxt, &lur->lur_hdr, lsm, logcookie, cookies);
+       rc = llog_obd_add(NULL, ctxt, &lur->lur_hdr, lsm, logcookie, cookies);
         llog_ctxt_put(ctxt);
 
         OBD_FREE_PTR(lur);
index e4bc00a..3e7dd17 100644 (file)
@@ -1593,8 +1593,8 @@ static int mgc_copy_handler(const struct lu_env *env, struct llog_handle *llh,
 
         /* Append all records */
         local_rec.lrh_len -= sizeof(*rec) + sizeof(struct llog_rec_tail);
-       rc = llog_write_rec(env, local_llh, &local_rec, NULL, 0,
-                            (void *)cfg_buf, -1);
+       rc = llog_write(env, local_llh, &local_rec, NULL, 0,
+                       (void *)cfg_buf, -1);
 
         lcfg = (struct lustre_cfg *)cfg_buf;
         CDEBUG(D_INFO, "idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
index 969547b..cd5264e 100644 (file)
@@ -649,8 +649,8 @@ static int mgs_modify_handler(const struct lu_env *env,
                 /* Header and tail are added back to lrh_len in
                    llog_lvfs_write_rec */
                 rec->lrh_len = cfg_len;
-               rc = llog_write_rec(NULL, llh, rec, NULL, 0, (void *)lcfg,
-                                   rec->lrh_index);
+               rc = llog_write(NULL, llh, rec, NULL, 0, (void *)lcfg,
+                               rec->lrh_index);
                 if (!rc)
                          mml->mml_modified++;
         }
@@ -741,7 +741,7 @@ static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         /* idx = -1 means append */
-       rc = llog_write_rec(NULL, llh, &rec, NULL, 0, (void *)lcfg, -1);
+       rc = llog_write(NULL, llh, &rec, NULL, 0, (void *)lcfg, -1);
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (rc)
                 CERROR("failed %d\n", rc);
index 3b0817a..0c643b2 100644 (file)
@@ -136,7 +136,7 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
        }
        cfs_spin_unlock(&loghandle->lgh_hdr_lock);
 
-       rc = llog_write_rec(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
+       rc = llog_write(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
        if (rc < 0) {
                CERROR("%s: fail to write header for llog #"LPX64"#"LPX64
                       "#%08x: rc = %d\n",
@@ -570,6 +570,189 @@ out:
 EXPORT_SYMBOL(llog_reverse_process);
 
 /**
+ * new llog API
+ *
+ * API functions:
+ *      llog_open - open llog, may not exist
+ *      llog_exist - check if llog exists
+ *      llog_close - close opened llog, pair for open, frees llog_handle
+ *      llog_declare_create - declare llog creation
+ *      llog_create - create new llog on disk, need transaction handle
+ *      llog_declare_write_rec - declaration of llog write
+ *      llog_write_rec - write llog record on disk, need transaction handle
+ *      llog_declare_add - declare llog catalog record addition
+ *      llog_add - add llog record in catalog, need transaction handle
+ */
+int llog_exist(struct llog_handle *loghandle)
+{
+       struct llog_operations  *lop;
+       int                      rc;
+
+       ENTRY;
+
+       rc = llog_handle2ops(loghandle, &lop);
+       if (rc)
+               RETURN(rc);
+       if (lop->lop_exist == NULL)
+               RETURN(-EOPNOTSUPP);
+
+       rc = lop->lop_exist(loghandle);
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_exist);
+
+int llog_declare_create(const struct lu_env *env,
+                       struct llog_handle *loghandle, struct thandle *th)
+{
+       struct llog_operations  *lop;
+       int                      raised, rc;
+
+       ENTRY;
+
+       rc = llog_handle2ops(loghandle, &lop);
+       if (rc)
+               RETURN(rc);
+       if (lop->lop_declare_create == NULL)
+               RETURN(-EOPNOTSUPP);
+
+       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+       if (!raised)
+               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       rc = lop->lop_declare_create(env, loghandle, th);
+       if (!raised)
+               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_declare_create);
+
+int llog_create(const struct lu_env *env, struct llog_handle *handle,
+               struct thandle *th)
+{
+       struct llog_operations  *lop;
+       int                      raised, rc;
+
+       ENTRY;
+
+       rc = llog_handle2ops(handle, &lop);
+       if (rc)
+               RETURN(rc);
+       if (lop->lop_create == NULL)
+               RETURN(-EOPNOTSUPP);
+
+       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+       if (!raised)
+               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       rc = lop->lop_create(env, handle, th);
+       if (!raised)
+               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_create);
+
+int llog_declare_write_rec(const struct lu_env *env,
+                          struct llog_handle *handle,
+                          struct llog_rec_hdr *rec, int idx,
+                          struct thandle *th)
+{
+       struct llog_operations  *lop;
+       int                      raised, rc;
+
+       ENTRY;
+
+       rc = llog_handle2ops(handle, &lop);
+       if (rc)
+               RETURN(rc);
+       LASSERT(lop);
+       if (lop->lop_declare_write_rec == NULL)
+               RETURN(-EOPNOTSUPP);
+
+       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+       if (!raised)
+               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       rc = lop->lop_declare_write_rec(env, handle, rec, idx, th);
+       if (!raised)
+               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_declare_write_rec);
+
+int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
+                  struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
+                  int numcookies, void *buf, int idx, struct thandle *th)
+{
+       struct llog_operations  *lop;
+       int                      raised, rc, buflen;
+
+       ENTRY;
+
+       rc = llog_handle2ops(handle, &lop);
+       if (rc)
+               RETURN(rc);
+
+       LASSERT(lop);
+       if (lop->lop_write_rec == NULL)
+               RETURN(-EOPNOTSUPP);
+
+       if (buf)
+               buflen = rec->lrh_len + sizeof(struct llog_rec_hdr) +
+                        sizeof(struct llog_rec_tail);
+       else
+               buflen = rec->lrh_len;
+       LASSERT(cfs_size_round(buflen) == buflen);
+
+       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+       if (!raised)
+               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       rc = lop->lop_write_rec(env, handle, rec, logcookies, numcookies,
+                               buf, idx, th);
+       if (!raised)
+               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_write_rec);
+
+int llog_add(const struct lu_env *env, struct llog_handle *lgh,
+            struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
+            void *buf, struct thandle *th)
+{
+       int raised, rc;
+
+       ENTRY;
+
+       if (lgh->lgh_logops->lop_add == NULL)
+               RETURN(-EOPNOTSUPP);
+
+       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+       if (!raised)
+               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, buf, th);
+       if (!raised)
+               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_add);
+
+int llog_declare_add(const struct lu_env *env, struct llog_handle *lgh,
+                    struct llog_rec_hdr *rec, struct thandle *th)
+{
+       int raised, rc;
+
+       ENTRY;
+
+       if (lgh->lgh_logops->lop_declare_add == NULL)
+               RETURN(-EOPNOTSUPP);
+
+       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+       if (!raised)
+               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       rc = lgh->lgh_logops->lop_declare_add(env, lgh, rec, th);
+       if (!raised)
+               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_declare_add);
+
+/**
  * Helper function to open llog or create it if doesn't exist.
  * It hides all transaction handling from caller.
  */
@@ -647,6 +830,56 @@ int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt,
 }
 EXPORT_SYMBOL(llog_erase);
 
+/*
+ * Helper function for write record in llog.
+ * It hides all transaction handling from caller.
+ * Valid only with local llog.
+ */
+int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
+              struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
+              int cookiecount, void *buf, int idx)
+{
+       int rc;
+
+       ENTRY;
+
+       LASSERT(loghandle);
+       LASSERT(loghandle->lgh_ctxt);
+
+       if (loghandle->lgh_obj != NULL) {
+               struct dt_device        *dt;
+               struct thandle          *th;
+
+               dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
+
+               th = dt_trans_create(env, dt);
+               if (IS_ERR(th))
+                       RETURN(PTR_ERR(th));
+
+               rc = llog_declare_write_rec(env, loghandle, rec, idx, th);
+               if (rc)
+                       GOTO(out_trans, rc);
+
+               rc = dt_trans_start_local(env, dt, th);
+               if (rc)
+                       GOTO(out_trans, rc);
+
+               cfs_down_write(&loghandle->lgh_lock);
+               rc = llog_write_rec(env, loghandle, rec, reccookie,
+                                   cookiecount, buf, idx, th);
+               cfs_up_write(&loghandle->lgh_lock);
+out_trans:
+               dt_trans_stop(env, dt, th);
+       } else { /* lvfs compatibility */
+               cfs_down_write(&loghandle->lgh_lock);
+               rc = llog_write_rec(env, loghandle, rec, reccookie,
+                                   cookiecount, buf, idx, NULL);
+               cfs_up_write(&loghandle->lgh_lock);
+       }
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_write);
+
 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
              struct llog_handle **lgh, struct llog_logid *logid,
              char *name, enum llog_open_param open_param)
index 50172ff..a3c5873 100644 (file)
@@ -26,6 +26,8 @@
 /*
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Intel, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -40,6 +42,8 @@
  *   if an OST or MDS fails it need only look at log(s) relevant to itself
  *
  * Author: Andreas Dilger <adilger@clusterfs.com>
+ * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
+ * Author: Mikhail Pershin <mike.pershin@intel.com>
  */
 
 #define DEBUG_SUBSYSTEM S_LOG
  *
  * Assumes caller has already pushed us into the kernel context and is locking.
  */
-static struct llog_handle *llog_cat_new_log(const struct lu_env *env,
-                                           struct llog_handle *cathandle)
+static int llog_cat_new_log(const struct lu_env *env,
+                           struct llog_handle *cathandle,
+                           struct llog_handle *loghandle,
+                           struct thandle *th)
 {
-        struct llog_handle *loghandle;
+
         struct llog_log_hdr *llh;
         struct llog_logid_rec rec = { { 0 }, };
         int rc, index, bitmap_size;
@@ -74,16 +80,21 @@ static struct llog_handle *llog_cat_new_log(const struct lu_env *env,
         /* maximum number of available slots in catlog is bitmap_size - 2 */
         if (llh->llh_cat_idx == index) {
                 CERROR("no free catalog slots for log...\n");
-                RETURN(ERR_PTR(-ENOSPC));
-        }
+               RETURN(-ENOSPC);
+       }
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
-                RETURN(ERR_PTR(-ENOSPC));
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
+               RETURN(-ENOSPC);
 
-       rc = llog_open_create(env, cathandle->lgh_ctxt, &loghandle, NULL,
-                             NULL);
-       if (rc)
-               RETURN(ERR_PTR(rc));
+       rc = llog_create(env, loghandle, th);
+       /* if llog is already created, no need to initialize it */
+       if (rc == -EEXIST) {
+               RETURN(0);
+       } else if (rc != 0) {
+               CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
+                      loghandle->lgh_ctxt->loc_obd->obd_name, rc);
+               RETURN(rc);
+       }
 
        rc = llog_init_handle(env, loghandle,
                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
@@ -120,22 +131,15 @@ static struct llog_handle *llog_cat_new_log(const struct lu_env *env,
 
         /* update the catalog: header and record */
        rc = llog_write_rec(env, cathandle, &rec.lid_hdr,
-                            &loghandle->u.phd.phd_cookie, 1, NULL, index);
-        if (rc < 0) {
-                GOTO(out_destroy, rc);
-        }
-
-        loghandle->lgh_hdr->llh_cat_idx = index;
-        cathandle->u.chd.chd_current_log = loghandle;
-        LASSERT(cfs_list_empty(&loghandle->u.phd.phd_entry));
-        cfs_list_add_tail(&loghandle->u.phd.phd_entry,
-                          &cathandle->u.chd.chd_head);
-
-out_destroy:
+                           &loghandle->u.phd.phd_cookie, 1, NULL, index, th);
        if (rc < 0)
-               llog_destroy(env, loghandle);
+               GOTO(out_destroy, rc);
 
-       RETURN(loghandle);
+       loghandle->lgh_hdr->llh_cat_idx = index;
+       RETURN(0);
+out_destroy:
+       llog_destroy(env, loghandle);
+       RETURN(rc);
 }
 
 /* Open an existent log handle and add it to the open list.
@@ -155,6 +159,7 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
        if (cathandle == NULL)
                RETURN(-EBADF);
 
+       cfs_down_write(&cathandle->lgh_lock);
        cfs_list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
                                u.phd.phd_entry) {
                struct llog_logid *cgl = &loghandle->lgh_id;
@@ -168,9 +173,11 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                                continue;
                        }
                        loghandle->u.phd.phd_cat_handle = cathandle;
+                       cfs_up_write(&cathandle->lgh_lock);
                        GOTO(out, rc = 0);
                }
        }
+       cfs_up_write(&cathandle->lgh_lock);
 
        rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, logid, NULL,
                       LLOG_OPEN_EXISTS);
@@ -187,11 +194,14 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                GOTO(out, rc);
        }
 
+       cfs_down_write(&cathandle->lgh_lock);
        cfs_list_add(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
+       cfs_up_write(&cathandle->lgh_lock);
 
        loghandle->u.phd.phd_cat_handle = cathandle;
        loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
-       loghandle->u.phd.phd_cookie.lgc_index = loghandle->lgh_hdr->llh_cat_idx;
+       loghandle->u.phd.phd_cookie.lgc_index =
+                               loghandle->lgh_hdr->llh_cat_idx;
        EXIT;
 out:
        *res = loghandle;
@@ -262,9 +272,8 @@ enum {
  *
  * NOTE: loghandle is write-locked upon successful return
  */
-static struct llog_handle *llog_cat_current_log(const struct lu_env *env,
-                                               struct llog_handle *cathandle,
-                                               int create)
+static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
+                                               struct thandle *th)
 {
         struct llog_handle *loghandle = NULL;
         ENTRY;
@@ -272,33 +281,31 @@ static struct llog_handle *llog_cat_current_log(const struct lu_env *env,
         cfs_down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
         loghandle = cathandle->u.chd.chd_current_log;
         if (loghandle) {
-                struct llog_log_hdr *llh = loghandle->lgh_hdr;
+               struct llog_log_hdr *llh;
 
-                cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
-                if (loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
+               cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
+               llh = loghandle->lgh_hdr;
+               if (llh == NULL ||
+                   loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
                         cfs_up_read(&cathandle->lgh_lock);
                         RETURN(loghandle);
                 } else {
                         cfs_up_write(&loghandle->lgh_lock);
                 }
         }
-        if (!create) {
-                if (loghandle)
-                        cfs_down_write(&loghandle->lgh_lock);
-                cfs_up_read(&cathandle->lgh_lock);
-                RETURN(loghandle);
-        }
         cfs_up_read(&cathandle->lgh_lock);
 
-        /* time to create new log */
+       /* time to use next log */
 
-        /* first, we have to make sure the state hasn't changed */
-        cfs_down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
-        loghandle = cathandle->u.chd.chd_current_log;
-        if (loghandle) {
-                struct llog_log_hdr *llh = loghandle->lgh_hdr;
+       /* first, we have to make sure the state hasn't changed */
+       cfs_down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
+       loghandle = cathandle->u.chd.chd_current_log;
+       if (loghandle) {
+               struct llog_log_hdr *llh;
 
-                cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
+               cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
+               llh = loghandle->lgh_hdr;
+               LASSERT(llh);
                 if (loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
                         cfs_up_write(&cathandle->lgh_lock);
                         RETURN(loghandle);
@@ -307,12 +314,15 @@ static struct llog_handle *llog_cat_current_log(const struct lu_env *env,
                 }
         }
 
-        CDEBUG(D_INODE, "creating new log\n");
-       loghandle = llog_cat_new_log(env, cathandle);
-        if (!IS_ERR(loghandle))
-                cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
-        cfs_up_write(&cathandle->lgh_lock);
-        RETURN(loghandle);
+       CDEBUG(D_INODE, "use next log\n");
+
+       loghandle = cathandle->u.chd.chd_next_log;
+       cathandle->u.chd.chd_current_log = loghandle;
+       cathandle->u.chd.chd_next_log = NULL;
+       cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
+       cfs_up_write(&cathandle->lgh_lock);
+       LASSERT(loghandle);
+       RETURN(loghandle);
 }
 
 /* Add a single record to the recovery log(s) using a catalog
@@ -322,35 +332,160 @@ static struct llog_handle *llog_cat_current_log(const struct lu_env *env,
  */
 int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
                     struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
-                    void *buf)
+                    void *buf, struct thandle *th)
 {
         struct llog_handle *loghandle;
         int rc;
         ENTRY;
 
         LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
-       loghandle = llog_cat_current_log(env, cathandle, 1);
-       if (IS_ERR(loghandle))
-               RETURN(PTR_ERR(loghandle));
+       loghandle = llog_cat_current_log(cathandle, th);
+       LASSERT(!IS_ERR(loghandle));
+
        /* loghandle is already locked by llog_cat_current_log() for us */
-       rc = llog_write_rec(env, loghandle, rec, reccookie, 1, buf, -1);
+       if (!llog_exist(loghandle)) {
+               rc = llog_cat_new_log(env, cathandle, loghandle, th);
+               if (rc < 0) {
+                       cfs_up_write(&loghandle->lgh_lock);
+                       RETURN(rc);
+               }
+       }
+       /* now let's try to add the record */
+       rc = llog_write_rec(env, loghandle, rec, reccookie, 1, buf, -1, th);
         if (rc < 0)
                 CERROR("llog_write_rec %d: lh=%p\n", rc, loghandle);
         cfs_up_write(&loghandle->lgh_lock);
         if (rc == -ENOSPC) {
-                /* to create a new plain log */
-               loghandle = llog_cat_current_log(env, cathandle, 1);
-               if (IS_ERR(loghandle))
-                       RETURN(PTR_ERR(loghandle));
+               /* try to use next log */
+               loghandle = llog_cat_current_log(cathandle, th);
+               LASSERT(!IS_ERR(loghandle));
+               /* new llog can be created concurrently */
+               if (!llog_exist(loghandle)) {
+                       rc = llog_cat_new_log(env, cathandle, loghandle, th);
+                       if (rc < 0) {
+                               cfs_up_write(&loghandle->lgh_lock);
+                               RETURN(rc);
+                       }
+               }
+               /* now let's try to add the record */
                rc = llog_write_rec(env, loghandle, rec, reccookie, 1, buf,
-                                   -1);
-                cfs_up_write(&loghandle->lgh_lock);
-        }
+                                   -1, th);
+               if (rc < 0)
+                       CERROR("llog_write_rec %d: lh=%p\n", rc, loghandle);
+               cfs_up_write(&loghandle->lgh_lock);
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cat_add_rec);
 
+int llog_cat_declare_add_rec(const struct lu_env *env,
+                            struct llog_handle *cathandle,
+                            struct llog_rec_hdr *rec, struct thandle *th)
+{
+       struct llog_handle      *loghandle, *next;
+       int                      rc = 0;
+
+       ENTRY;
+
+       if (cathandle->u.chd.chd_current_log == NULL) {
+               /* declare new plain llog */
+               cfs_down_write(&cathandle->lgh_lock);
+               if (cathandle->u.chd.chd_current_log == NULL) {
+                       rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
+                                      NULL, NULL, LLOG_OPEN_NEW);
+                       if (rc == 0) {
+                               cathandle->u.chd.chd_current_log = loghandle;
+                               cfs_list_add_tail(&loghandle->u.phd.phd_entry,
+                                                 &cathandle->u.chd.chd_head);
+                       }
+               }
+               cfs_up_write(&cathandle->lgh_lock);
+       } else if (cathandle->u.chd.chd_next_log == NULL) {
+               /* declare next plain llog */
+               cfs_down_write(&cathandle->lgh_lock);
+               if (cathandle->u.chd.chd_next_log == NULL) {
+                       rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
+                                      NULL, NULL, LLOG_OPEN_NEW);
+                       if (rc == 0) {
+                               cathandle->u.chd.chd_next_log = loghandle;
+                               cfs_list_add_tail(&loghandle->u.phd.phd_entry,
+                                                 &cathandle->u.chd.chd_head);
+                       }
+               }
+               cfs_up_write(&cathandle->lgh_lock);
+       }
+       if (rc)
+               GOTO(out, rc);
+
+       if (!llog_exist(cathandle->u.chd.chd_current_log)) {
+               rc = llog_declare_create(env, cathandle->u.chd.chd_current_log,
+                                        th);
+               if (rc)
+                       GOTO(out, rc);
+               llog_declare_write_rec(env, cathandle, NULL, -1, th);
+       }
+       /* declare records in the llogs */
+       rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
+                                   rec, -1, th);
+       if (rc)
+               GOTO(out, rc);
+
+       next = cathandle->u.chd.chd_next_log;
+       if (next) {
+               if (!llog_exist(next)) {
+                       rc = llog_declare_create(env, next, th);
+                       llog_declare_write_rec(env, cathandle, NULL, -1, th);
+               }
+               llog_declare_write_rec(env, next, rec, -1, th);
+       }
+out:
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_cat_declare_add_rec);
+
+int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle,
+                struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
+                void *buf)
+{
+       struct llog_ctxt        *ctxt;
+       struct dt_device        *dt;
+       struct thandle          *th = NULL;
+       int                      rc;
+
+       ctxt = cathandle->lgh_ctxt;
+       LASSERT(ctxt);
+       LASSERT(ctxt->loc_exp);
+
+       if (cathandle->lgh_obj != NULL) {
+               dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
+               LASSERT(dt);
+
+               th = dt_trans_create(env, dt);
+               if (IS_ERR(th))
+                       RETURN(PTR_ERR(th));
+
+               rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
+               if (rc)
+                       GOTO(out_trans, rc);
+
+               rc = dt_trans_start_local(env, dt, th);
+               if (rc)
+                       GOTO(out_trans, rc);
+               rc = llog_cat_add_rec(env, cathandle, rec, reccookie, buf, th);
+out_trans:
+               dt_trans_stop(env, dt, th);
+       } else { /* lvfs compat code */
+               LASSERT(cathandle->lgh_file != NULL);
+               rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
+               if (rc == 0)
+                       rc = llog_cat_add_rec(env, cathandle, rec, reccookie,
+                                             buf, th);
+       }
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_cat_add);
+
 /* For each cookie in the cookie array, we clear the log in-use bit and either:
  * - the log is empty, so mark it free in the catalog header and delete it
  * - the log is not empty, just write out the log header
@@ -364,14 +499,14 @@ int llog_cat_cancel_records(const struct lu_env *env,
                            struct llog_handle *cathandle, int count,
                            struct llog_cookie *cookies)
 {
-       int i, index, rc = 0;
+       int i, index, rc = 0, failed = 0;
 
        ENTRY;
 
-       cfs_down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
        for (i = 0; i < count; i++, cookies++) {
                struct llog_handle      *loghandle;
                struct llog_logid       *lgl = &cookies->lgc_lgl;
+               int                      lrc;
 
                rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
                if (rc) {
@@ -379,28 +514,36 @@ int llog_cat_cancel_records(const struct lu_env *env,
                        break;
                }
 
-               cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
-               rc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
-               cfs_up_write(&loghandle->lgh_lock);
-
-               if (rc == 1) {          /* log has been destroyed */
+               lrc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
+               if (lrc == 1) {          /* log has been destroyed */
                        index = loghandle->u.phd.phd_cookie.lgc_index;
+                       cfs_down_write(&cathandle->lgh_lock);
                        if (cathandle->u.chd.chd_current_log == loghandle)
                                cathandle->u.chd.chd_current_log = NULL;
+                       cfs_up_write(&cathandle->lgh_lock);
                        llog_close(env, loghandle);
 
                        LASSERT(index);
                        llog_cat_set_first_idx(cathandle, index);
-                       rc = llog_cancel_rec(env, cathandle, index);
-                        if (rc == 0)
-                                CDEBUG(D_RPCTRACE,"cancel plain log at index %u"
-                                       " of catalog "LPX64"\n",
-                                       index, cathandle->lgh_id.lgl_oid);
-                }
-        }
-        cfs_up_write(&cathandle->lgh_lock);
+                       lrc = llog_cancel_rec(env, cathandle, index);
+                       if (lrc == 0)
+                               CDEBUG(D_RPCTRACE, "cancel plain log at index"
+                                      " %u of catalog "LPX64"\n",
+                                      index, cathandle->lgh_id.lgl_oid);
+               } else if (lrc == -ENOENT) {
+                       if (rc == 0) /* ENOENT shouldn't rewrite any error */
+                               rc = lrc;
+               } else if (lrc < 0) {
+                       failed++;
+                       rc = lrc;
+               }
+       }
+       if (rc)
+               CERROR("%s: fail to cancel %d of %d llog-records: rc = %d\n",
+                      cathandle->lgh_ctxt->loc_obd->obd_name, failed, count,
+                      rc);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cat_cancel_records);
 
index 1bd8837..c209cca 100644 (file)
@@ -241,7 +241,7 @@ static int llog_remove_log(const struct lu_env *env, struct llog_handle *cat,
         int rc, index = 0;
 
         ENTRY;
-        cfs_down_write(&cat->lgh_lock);
+
        rc = llog_cat_id2handle(env, cat, &log, logid);
         if (rc) {
                 CDEBUG(D_IOCTL, "cannot find log #"LPX64"#"LPX64"#%08x\n",
@@ -256,10 +256,13 @@ static int llog_remove_log(const struct lu_env *env, struct llog_handle *cat,
                CDEBUG(D_IOCTL, "cannot destroy log\n");
                GOTO(out, rc);
        }
+       cfs_down_write(&cat->lgh_lock);
+       if (cat->u.chd.chd_current_log == log)
+               cat->u.chd.chd_current_log = NULL;
+       cfs_up_write(&cat->lgh_lock);
        llog_cat_set_first_idx(cat, index);
        rc = llog_cancel_rec(env, cat, index);
 out:
-       cfs_up_write(&cat->lgh_lock);
        llog_close(env, log);
        RETURN(rc);
 
@@ -365,23 +368,24 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
                 if (*endp != '\0')
                         GOTO(out_close, err = -EINVAL);
 
-                if (handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) {
-                        cfs_down_write(&handle->lgh_lock);
+               if (handle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN) {
                        err = llog_cancel_rec(NULL, handle, cookie.lgc_index);
-                        cfs_up_write(&handle->lgh_lock);
-                        GOTO(out_close, err);
-                }
-
-                err = str2logid(&plain, data->ioc_inlbuf2, data->ioc_inllen2);
-                if (err)
-                        GOTO(out_close, err);
-                cookie.lgc_lgl = plain;
+                       GOTO(out_close, err);
+               } else if (!(handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)) {
+                       GOTO(out_close, err = -EINVAL);
+               }
 
-                if (!(handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
-                        GOTO(out_close, err = -EINVAL);
+               if (data->ioc_inlbuf2 == NULL) /* catalog but no logid */
+                       GOTO(out_close, err = -ENOTTY);
 
+               err = str2logid(&plain, data->ioc_inlbuf2, data->ioc_inllen2);
+               if (err)
+                       GOTO(out_close, err);
+               cookie.lgc_lgl = plain;
                err = llog_cat_cancel_records(NULL, handle, 1, &cookie);
-                GOTO(out_close, err);
+               if (err)
+                       GOTO(out_close, err);
+               break;
         }
         case OBD_IOC_LLOG_REMOVE: {
                 struct llog_logid plain;
@@ -410,6 +414,8 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
                break;
        }
        default:
+               CERROR("%s: Unknown ioctl cmd %#x\n",
+                      ctxt->loc_obd->obd_name, cmd);
                GOTO(out_close, err = -ENOTTY);
         }
 
index 6046ec5..9b8536d 100644 (file)
@@ -216,7 +216,7 @@ static int llog_lvfs_write_rec(const struct lu_env *env,
                               struct llog_handle *loghandle,
                               struct llog_rec_hdr *rec,
                               struct llog_cookie *reccookie, int cookiecount,
-                              void *buf, int idx)
+                              void *buf, int idx, struct thandle *th)
 {
         struct llog_log_hdr *llh;
         int reclen = rec->lrh_len, index, rc;
@@ -939,6 +939,21 @@ out1:
 }
 EXPORT_SYMBOL(llog_put_cat_list);
 
+static int llog_lvfs_declare_create(const struct lu_env *env,
+                                   struct llog_handle *res,
+                                   struct thandle *th)
+{
+       return 0;
+}
+
+static int llog_lvfs_declare_write_rec(const struct lu_env *env,
+                                      struct llog_handle *loghandle,
+                                      struct llog_rec_hdr *rec,
+                                      int idx, struct thandle *th)
+{
+       return 0;
+}
+
 struct llog_operations llog_lvfs_ops = {
        .lop_write_rec          = llog_lvfs_write_rec,
        .lop_next_block         = llog_lvfs_next_block,
@@ -949,6 +964,8 @@ struct llog_operations llog_lvfs_ops = {
        .lop_close              = llog_lvfs_close,
        .lop_open               = llog_lvfs_open,
        .lop_exist              = llog_lvfs_exist,
+       .lop_declare_create     = llog_lvfs_declare_create,
+       .lop_declare_write_rec  = llog_lvfs_declare_write_rec,
 };
 EXPORT_SYMBOL(llog_lvfs_ops);
 #else /* !__KERNEL__ */
index b84e9dc..adc8203 100644 (file)
@@ -246,9 +246,9 @@ int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp, int flags)
 }
 EXPORT_SYMBOL(llog_sync);
 
-int llog_add(const struct lu_env *env, struct llog_ctxt *ctxt,
-            struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
-            struct llog_cookie *logcookies, int numcookies)
+int llog_obd_add(const struct lu_env *env, struct llog_ctxt *ctxt,
+                struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
+                struct llog_cookie *logcookies, int numcookies)
 {
         int raised, rc;
         ENTRY;
@@ -261,18 +261,17 @@ int llog_add(const struct lu_env *env, struct llog_ctxt *ctxt,
         if (ctxt->loc_flags & LLOG_CTXT_FLAG_UNINITIALIZED)
                 RETURN(-ENXIO);
 
-
-        CTXT_CHECK_OP(ctxt, add, -EOPNOTSUPP);
+       CTXT_CHECK_OP(ctxt, obd_add, -EOPNOTSUPP);
         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
         if (!raised)
                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
-       rc = CTXTP(ctxt, add)(env, ctxt, rec, lsm, logcookies,
-                             numcookies);
+       rc = CTXTP(ctxt, obd_add)(env, ctxt, rec, lsm, logcookies,
+                                 numcookies);
         if (!raised)
                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
         RETURN(rc);
 }
-EXPORT_SYMBOL(llog_add);
+EXPORT_SYMBOL(llog_obd_add);
 
 int llog_cancel(const struct lu_env *env, struct llog_ctxt *ctxt,
                struct lov_stripe_md *lsm, int count,
@@ -367,7 +366,7 @@ int llog_obd_origin_add(const struct lu_env *env, struct llog_ctxt *ctxt,
 
         cathandle = ctxt->loc_handle;
         LASSERT(cathandle != NULL);
-       rc = llog_cat_add_rec(env, cathandle, rec, logcookies, NULL);
+       rc = llog_cat_add(env, cathandle, rec, logcookies, NULL);
         if (rc != 0 && rc != 1)
                 CERROR("write one catalog record failed: %d\n", rc);
         RETURN(rc);
index 2ab53d0..b119a83 100644 (file)
@@ -230,7 +230,7 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
        lgr.lgr_hdr.lrh_type = LLOG_GEN_REC;
 
        CWARN("3a: write one create_rec\n");
-       rc = llog_write_rec(env, llh,  &lgr.lgr_hdr, NULL, 0, NULL, -1);
+       rc = llog_write(env, llh,  &lgr.lgr_hdr, NULL, 0, NULL, -1);
        num_recs++;
        if (rc < 0) {
                CERROR("3a: write one log record failed: %d\n", rc);
@@ -249,7 +249,7 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
                hdr.lrh_len = 8;
                hdr.lrh_type = OBD_CFG_REC;
                memset(buf, 0, sizeof buf);
-               rc = llog_write_rec(env, llh, &hdr, NULL, 0, buf, -1);
+               rc = llog_write(env, llh, &hdr, NULL, 0, buf, -1);
                if (rc < 0) {
                        CERROR("3b: write 10 records failed at #%d: %d\n",
                               i + 1, rc);
@@ -264,8 +264,7 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
 
        CWARN("3c: write 1000 more log records\n");
        for (i = 0; i < 1000; i++) {
-               rc = llog_write_rec(env, llh, &lgr.lgr_hdr, NULL, 0, NULL,
-                                   -1);
+               rc = llog_write(env, llh, &lgr.lgr_hdr, NULL, 0, NULL, -1);
                if (rc < 0) {
                        CERROR("3c: write 1000 records failed at #%d: %d\n",
                               i + 1, rc);
@@ -289,13 +288,11 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
                if ((i % 2) == 0) {
                        hdr.lrh_len = 24;
                        hdr.lrh_type = OBD_CFG_REC;
-                       rc = llog_write_rec(env, llh, &hdr, NULL, 0, buf_even,
-                                           -1);
+                       rc = llog_write(env, llh, &hdr, NULL, 0, buf_even, -1);
                } else {
                        hdr.lrh_len = 32;
                        hdr.lrh_type = OBD_CFG_REC;
-                       rc = llog_write_rec(env, llh, &hdr, NULL, 0, buf_odd,
-                                           -1);
+                       rc = llog_write(env, llh, &hdr, NULL, 0, buf_odd, -1);
                }
                if (rc == -ENOSPC) {
                        break;
@@ -361,7 +358,7 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
        GOTO(out, rc);
 
        CWARN("4b: write 1 record into the catalog\n");
-       rc = llog_cat_add_rec(env, cath, &lmr.lmr_hdr, &cookie, NULL);
+       rc = llog_cat_add(env, cath, &lmr.lmr_hdr, &cookie, NULL);
        if (rc != 1) {
                CERROR("4b: write 1 catalog record failed at: %d\n", rc);
                GOTO(out, rc);
@@ -389,7 +386,7 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
 
        CWARN("4d: write %d more log records\n", LLOG_TEST_RECNUM);
        for (i = 0; i < LLOG_TEST_RECNUM; i++) {
-               rc = llog_cat_add_rec(env, cath, &lmr.lmr_hdr, NULL, NULL);
+               rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL, NULL);
                if (rc) {
                        CERROR("4d: write %d records failed at #%d: %d\n",
                               LLOG_TEST_RECNUM, i + 1, rc);
@@ -412,7 +409,7 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
        for (i = 0; i < 5; i++) {
                rec.lrh_len = buflen;
                rec.lrh_type = OBD_CFG_REC;
-               rc = llog_cat_add_rec(env, cath, &rec, NULL, buf);
+               rc = llog_cat_add(env, cath, &rec, NULL, buf);
                if (rc) {
                        CERROR("4e: write 5 records failed at #%d: %d\n",
                               i + 1, rc);
@@ -563,7 +560,7 @@ static int llog_test_5(const struct lu_env *env, struct obd_device *obd)
        }
 
        CWARN("5d: add 1 record to the log with many canceled empty pages\n");
-       rc = llog_cat_add_rec(env, llh, &lmr.lmr_hdr, NULL, NULL);
+       rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL, NULL);
        if (rc) {
                CERROR("5d: add record to the log with many canceled empty "
                       "pages failed\n");
@@ -748,8 +745,8 @@ static int llog_test_7_sub(const struct lu_env *env, struct llog_ctxt *ctxt)
                GOTO(out_close, rc);
        }
        for (i = 0; i < LLOG_BITMAP_SIZE(llh->lgh_hdr); i++) {
-               rc = llog_write_rec(env, llh, &llog_records.lrh, NULL, 0,
-                                   NULL, -1);
+               rc = llog_write(env, llh, &llog_records.lrh, NULL, 0,
+                               NULL, -1);
                if (rc == -ENOSPC) {
                        break;
                } else if (rc < 0) {
index ef25f36..360cf72 100644 (file)
@@ -2343,9 +2343,8 @@ obd_cleanup:
 static struct llog_operations filter_mds_ost_repl_logops;
 
 static struct llog_operations filter_size_orig_logops = {
-        .lop_setup   = llog_obd_origin_setup,
-        .lop_cleanup = llog_obd_origin_cleanup,
-        .lop_add     = llog_obd_origin_add
+       .lop_setup      = llog_obd_origin_setup,
+       .lop_cleanup    = llog_obd_origin_cleanup,
 };
 
 static int filter_olg_fini(struct obd_llog_group *olg)
index 626ab8f..7cc0b08 100644 (file)
@@ -94,7 +94,7 @@ int filter_log_sz_change(struct llog_handle *cathandle,
         lsc->lsc_fid = *mds_fid;
         lsc->lsc_ioepoch = ioepoch;
 
-       rc = llog_cat_add_rec(NULL, cathandle, &lsc->lsc_hdr, logcookie, NULL);
+       rc = llog_cat_add(NULL, cathandle, &lsc->lsc_hdr, logcookie, NULL);
         OBD_FREE(lsc, sizeof(*lsc));
 
         if (rc > 0) {
index d772f18..d85d2d1 100644 (file)
@@ -3775,7 +3775,7 @@ int __init osc_init(void)
         osc_mds_ost_orig_logops = llog_lvfs_ops;
         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
-        osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
+       osc_mds_ost_orig_logops.lop_obd_add = llog_obd_origin_add;
         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
 
         RETURN(rc);
index 679f99f..9db6799 100644 (file)
@@ -95,7 +95,7 @@ int llog_origin_connect(struct llog_ctxt *ctxt,
         }
 
         lgr->lgr_gen = ctxt->loc_gen;
-       rc = llog_add(NULL, ctxt, &lgr->lgr_hdr, NULL, NULL, 1);
+       rc = llog_obd_add(NULL, ctxt, &lgr->lgr_hdr, NULL, NULL, 1);
         OBD_FREE_PTR(lgr);
         rc1 = fsfilt_commit(ctxt->loc_exp->exp_obd, inode, handle, 0);
         if (rc != 1 || rc1 != 0) {
index a4ea3f1..a159a06 100644 (file)
@@ -207,8 +207,8 @@ int llog_origin_handle_next_block(struct ptlrpc_request *req)
 
         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
        rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
-                            &body->lgd_saved_index, body->lgd_index,
-                            &body->lgd_cur_offset, ptr, LLOG_CHUNK_SIZE);
+                            &repbody->lgd_saved_index, repbody->lgd_index,
+                            &repbody->lgd_cur_offset, ptr, LLOG_CHUNK_SIZE);
        if (rc)
                GOTO(out_close, rc);
        EXIT;
index 6c74de2..6da9d20 100644 (file)
@@ -983,7 +983,7 @@ static int sptlrpc_record_rule_set(struct llog_handle *llh,
                                         lcfg->lcfg_buflens);
                 rec.lrh_len = llog_data_len(buflen);
                 rec.lrh_type = OBD_CFG_REC;
-               rc = llog_write_rec(NULL, llh, &rec, NULL, 0, (void *)lcfg, -1);
+               rc = llog_write(NULL, llh, &rec, NULL, 0, (void *)lcfg, -1);
                 if (rc)
                         CERROR("failed to write a rec: rc = %d\n", rc);
                 lustre_cfg_free(lcfg);