Whamcloud - gitweb
LU-1302 llog: introduce llog_open
authorMikhail Pershin <tappro@whamcloud.com>
Tue, 21 Aug 2012 17:44:55 +0000 (21:44 +0400)
committerOleg Drokin <green@whamcloud.com>
Mon, 10 Sep 2012 02:48:03 +0000 (22:48 -0400)
- llog_open is pair to llog_close. Llog handle is allocated inside
  llog_open while llog_free_handle is part of llog_close. Each llog
  even non-existent should be opened and closed.
- llog_exist() is added to API to check if llog file exists
- llog_cat_put is renamed to llog_cat_close with cleaning up empty
  llogs like llog_obd_origin_cleanup did.

Signed-off-by: Mikhail Pershin <tappro@whamcloud.com>
Change-Id: I6360b4c089ec1dde50289563447eefd3dfa4365a
Reviewed-on: http://review.whamcloud.com/3740
Tested-by: Hudson
Reviewed-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
18 files changed:
lustre/include/lustre_log.h
lustre/include/lustre_net.h
lustre/ldlm/ldlm_lockd.c
lustre/mdc/mdc_request.c
lustre/mdt/mdt_handler.c
lustre/mgc/mgc_request.c
lustre/mgs/mgs_handler.c
lustre/mgs/mgs_llog.c
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_ioctl.c
lustre/obdclass/llog_lvfs.c
lustre/obdclass/llog_obd.c
lustre/obdclass/llog_test.c
lustre/obdclass/obd_config.c
lustre/ptlrpc/llog_client.c
lustre/ptlrpc/llog_server.c
lustre/ptlrpc/sec_config.c

index 9471603..a6ce84f 100644 (file)
         snprintf(logname, sizeof(logname), "LOGS/%s", name)
 #define LLOG_EEMPTY 4711
 
         snprintf(logname, sizeof(logname), "LOGS/%s", name)
 #define LLOG_EEMPTY 4711
 
+enum llog_open_param {
+       LLOG_OPEN_EXISTS        = 0x0000,
+       LLOG_OPEN_NEW           = 0x0001,
+};
+
 struct plain_handle_data {
         cfs_list_t          phd_entry;
         struct llog_handle *phd_cat_handle;
 struct plain_handle_data {
         cfs_list_t          phd_entry;
         struct llog_handle *phd_cat_handle;
@@ -113,10 +118,8 @@ struct llog_handle;
 /* llog.c  -  general API */
 typedef int (*llog_cb_t)(const struct lu_env *env, struct llog_handle *lgh,
                         struct llog_rec_hdr *rec, void *data);
 /* llog.c  -  general API */
 typedef int (*llog_cb_t)(const struct lu_env *env, struct llog_handle *lgh,
                         struct llog_rec_hdr *rec, void *data);
-extern struct llog_handle *llog_alloc_handle(void);
 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
                     int flags, struct obd_uuid *uuid);
 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
                     int flags, struct obd_uuid *uuid);
-extern void llog_free_handle(struct llog_handle *handle);
 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
                 llog_cb_t cb, void *data, void *catdata);
 int llog_reverse_process(const struct lu_env *env,
 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
                 llog_cb_t cb, void *data, void *catdata);
 int llog_reverse_process(const struct lu_env *env,
@@ -124,6 +127,9 @@ int llog_reverse_process(const struct lu_env *env,
                         void *data, void *catdata);
 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
                    int index);
                         void *data, void *catdata);
 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
                    int index);
+int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
+             struct llog_handle **lgh, struct llog_logid *logid,
+             char *name, enum llog_open_param open_param);
 int llog_close(const struct lu_env *env, struct llog_handle *cathandle);
 int llog_get_size(struct llog_handle *loghandle);
 
 int llog_close(const struct lu_env *env, struct llog_handle *cathandle);
 int llog_get_size(struct llog_handle *loghandle);
 
@@ -177,7 +183,7 @@ struct llog_process_cat_args {
 
 int cat_cancel_cb(const struct lu_env *env, struct llog_handle *cathandle,
                  struct llog_rec_hdr *rec, void *data);
 
 int cat_cancel_cb(const struct lu_env *env, struct llog_handle *cathandle,
                  struct llog_rec_hdr *rec, void *data);
-int llog_cat_put(const struct lu_env *env, struct llog_handle *cathandle);
+int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle);
 int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
                     struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
                     void *buf);
 int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
                     struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
                     void *buf);
@@ -271,30 +277,59 @@ struct llog_operations {
                          struct llog_cookie *cookies, int flags);
        int (*lop_connect)(struct llog_ctxt *ctxt, struct llog_logid *logid,
                           struct llog_gen *gen, struct obd_uuid *uuid);
                          struct llog_cookie *cookies, int flags);
        int (*lop_connect)(struct llog_ctxt *ctxt, struct llog_logid *logid,
                           struct llog_gen *gen, struct obd_uuid *uuid);
-       int (*lop_create)(const struct lu_env *env, struct llog_ctxt *ctxt,
-                         struct llog_handle **h, struct llog_logid *logid,
-                         char *name);
+       /**
+        * Any llog file must be opened first using llog_open().  Llog can be
+        * opened by name, logid or without both, in last case the new logid
+        * will be generated.
+        */
+       int (*lop_open)(const struct lu_env *env, struct llog_handle *lgh,
+                       struct llog_logid *logid, char *name,
+                       enum llog_open_param);
+       /**
+        * Opened llog may not exist and this must be checked where needed using
+        * the llog_exist() call.
+        */
+       int (*lop_exist)(struct llog_handle *lgh);
+       /**
+        * Close llog file and calls llog_free_handle() implicitly.
+        * Any opened llog must be closed by llog_close() call.
+        */
+       int (*lop_close)(const struct lu_env *env, struct llog_handle *handle);
+       /**
+        * Create new llog file. The llog must be opened.
+        * Must be used only for local llog operations.
+        */
+       int (*lop_declare_create)(const struct lu_env *env,
+                                 struct llog_handle *handle,
+                                 struct thandle *th);
+       int (*lop_create)(const struct lu_env *env, struct llog_handle *handle,
+                         struct thandle *th);
+       /**
+        * write new record in llog. It appends records usually but can edit
+        * existing records too.
+        */
        int (*lop_write_rec)(const struct lu_env *env,
                             struct llog_handle *loghandle,
                             struct llog_rec_hdr *rec,
                             struct llog_cookie *logcookies, int numcookies,
        int (*lop_write_rec)(const struct lu_env *env,
                             struct llog_handle *loghandle,
                             struct llog_rec_hdr *rec,
                             struct llog_cookie *logcookies, int numcookies,
-                            void *, int idx);
+                            void *buf, int idx);
+       /**
+        * Add new record in llog catalog. Does the same as llog_write_rec()
+        * but using llog catalog.
+        */
        int (*lop_add)(const struct lu_env *env, struct llog_ctxt *ctxt,
                       struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
                       struct llog_cookie *logcookies, int numcookies);
        int (*lop_add)(const struct lu_env *env, struct llog_ctxt *ctxt,
                       struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
                       struct llog_cookie *logcookies, int numcookies);
-       int (*lop_close)(const struct lu_env *env, struct llog_handle *handle);
 };
 
 /* In-memory descriptor for a log object or log catalog */
 struct llog_handle {
        cfs_rw_semaphore_t       lgh_lock;
 };
 
 /* In-memory descriptor for a log object or log catalog */
 struct llog_handle {
        cfs_rw_semaphore_t       lgh_lock;
+       cfs_spinlock_t           lgh_hdr_lock; /* protect lgh_hdr data */
        struct llog_logid        lgh_id; /* id of this log */
        struct llog_log_hdr     *lgh_hdr;
        struct llog_logid        lgh_id; /* id of this log */
        struct llog_log_hdr     *lgh_hdr;
-       cfs_spinlock_t           lgh_hdr_lock; /* protect lgh_hdr data */
-       union {
-               struct file             *lgh_file;
-               struct dt_object        *lgh_obj;
-       };
+       struct file             *lgh_file;
+       struct dt_object        *lgh_obj;
        int                      lgh_last_idx;
        int                      lgh_cur_idx; /* used during llog_process */
        __u64                    lgh_cur_offset; /* used during llog_process */
        int                      lgh_last_idx;
        int                      lgh_cur_idx; /* used during llog_process */
        __u64                    lgh_cur_offset; /* used during llog_process */
@@ -468,10 +503,11 @@ static inline int llog_obd2ops(struct llog_ctxt *ctxt,
 static inline int llog_handle2ops(struct llog_handle *loghandle,
                                   struct llog_operations **lop)
 {
 static inline int llog_handle2ops(struct llog_handle *loghandle,
                                   struct llog_operations **lop)
 {
-        if (loghandle == NULL)
-                return -EINVAL;
+       if (loghandle == NULL || loghandle->lgh_logops == NULL)
+               return -EINVAL;
 
 
-        return llog_obd2ops(loghandle->lgh_ctxt, lop);
+       *lop = loghandle->lgh_logops;
+       return 0;
 }
 
 static inline int llog_data_len(int len)
 }
 
 static inline int llog_data_len(int len)
@@ -603,24 +639,6 @@ static inline int llog_write_rec(const struct lu_env *env,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static inline int llog_read_header(const struct lu_env *env,
-                                  struct llog_handle *handle)
-{
-       struct llog_operations *lop;
-       int rc;
-
-       ENTRY;
-
-       rc = llog_handle2ops(handle, &lop);
-       if (rc)
-               RETURN(rc);
-       if (lop->lop_read_header == NULL)
-               RETURN(-EOPNOTSUPP);
-
-       rc = lop->lop_read_header(env, handle);
-       RETURN(rc);
-}
-
 static inline int llog_destroy(const struct lu_env *env,
                               struct llog_handle *handle)
 {
 static inline int llog_destroy(const struct lu_env *env,
                               struct llog_handle *handle)
 {
@@ -679,52 +697,115 @@ static inline int llog_prev_block(const struct lu_env *env,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static inline int llog_create(const struct lu_env *env, struct llog_ctxt *ctxt,
-                             struct llog_handle **res,
-                             struct llog_logid *logid, char *name)
+static inline int llog_connect(struct llog_ctxt *ctxt,
+                              struct llog_logid *logid, struct llog_gen *gen,
+                              struct obd_uuid *uuid)
+{
+       struct llog_operations  *lop;
+       int                      rc;
+
+       ENTRY;
+
+       rc = llog_obd2ops(ctxt, &lop);
+       if (rc)
+               RETURN(rc);
+       if (lop->lop_connect == NULL)
+               RETURN(-EOPNOTSUPP);
+
+       rc = lop->lop_connect(ctxt, logid, gen, uuid);
+       RETURN(rc);
+}
+
+/**
+ * new llog API
+ *
+ * API functions:
+ *      llog_open - open llog, may not exist
+ *      llog_exist - check if llog exists
+ *      llog_close - close opened llog, pair for open, frees llog_handle
+ *      llog_declare_create - declare llog creation
+ *      llog_create - create new llog on disk, need transaction handle
+ */
+static inline int llog_exist(struct llog_handle *loghandle)
 {
        struct llog_operations *lop;
        int raised, rc;
 
        ENTRY;
 
 {
        struct llog_operations *lop;
        int raised, rc;
 
        ENTRY;
 
-       rc = llog_obd2ops(ctxt, &lop);
+       rc = llog_handle2ops(loghandle, &lop);
        if (rc)
                RETURN(rc);
        if (rc)
                RETURN(rc);
-       if (lop->lop_create == NULL)
+       if (lop->lop_exist == NULL)
+               RETURN(-EOPNOTSUPP);
+       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+       if (!raised)
+               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       rc = lop->lop_exist(loghandle);
+       if (!raised)
+               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       RETURN(rc);
+}
+
+static inline int llog_declare_create(const struct lu_env *env,
+                                     struct llog_handle *loghandle,
+                                     struct thandle *th)
+{
+       struct llog_operations  *lop;
+       int                      raised, rc;
+
+       ENTRY;
+
+       rc = llog_handle2ops(loghandle, &lop);
+       if (rc)
+               RETURN(rc);
+       if (lop->lop_declare_create == NULL)
                RETURN(-EOPNOTSUPP);
 
        raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
        if (!raised)
                cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
                RETURN(-EOPNOTSUPP);
 
        raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
        if (!raised)
                cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
-       rc = lop->lop_create(env, ctxt, res, logid, name);
+       rc = lop->lop_declare_create(env, loghandle, th);
        if (!raised)
                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
        RETURN(rc);
 }
 
        if (!raised)
                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
        RETURN(rc);
 }
 
-static inline int llog_connect(struct llog_ctxt *ctxt,
-                               struct llog_logid *logid, struct llog_gen *gen,
-                               struct obd_uuid *uuid)
+static inline int llog_create(const struct lu_env *env,
+                             struct llog_handle *handle, struct thandle *th)
 {
 {
-        struct llog_operations *lop;
-        int rc;
-        ENTRY;
-
-        rc = llog_obd2ops(ctxt, &lop);
-        if (rc)
-                RETURN(rc);
-        if (lop->lop_connect == NULL)
-                RETURN(-EOPNOTSUPP);
-
-        rc = lop->lop_connect(ctxt, logid, gen, uuid);
-        RETURN(rc);
+       struct llog_operations  *lop;
+       int                      raised, rc;
+
+       ENTRY;
+
+       rc = llog_handle2ops(handle, &lop);
+       if (rc)
+               RETURN(rc);
+       if (lop->lop_create == NULL)
+               RETURN(-EOPNOTSUPP);
+
+       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+       if (!raised)
+               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       rc = lop->lop_create(env, handle, th);
+       if (!raised)
+               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       RETURN(rc);
 }
 
 int lustre_process_log(struct super_block *sb, char *logname,
                        struct config_llog_instance *cfg);
 int lustre_end_log(struct super_block *sb, char *logname,
                    struct config_llog_instance *cfg);
 }
 
 int lustre_process_log(struct super_block *sb, char *logname,
                        struct config_llog_instance *cfg);
 int lustre_end_log(struct super_block *sb, char *logname,
                    struct config_llog_instance *cfg);
+int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
+                    struct llog_handle **res, struct llog_logid *logid,
+                    char *name);
+int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt,
+              struct llog_logid *logid, char *name);
+int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
+              struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
+              int cookiecount, void *buf, int idx);
 
 /** @} log */
 
 
 /** @} log */
 
index a90fa50..92b41ac 100644 (file)
@@ -2278,7 +2278,7 @@ static inline void ptlrpc_lprocfs_brw(struct ptlrpc_request *req, int bytes) {}
 /** @} */
 
 /* ptlrpc/llog_server.c */
 /** @} */
 
 /* ptlrpc/llog_server.c */
-int llog_origin_handle_create(struct ptlrpc_request *req);
+int llog_origin_handle_open(struct ptlrpc_request *req);
 int llog_origin_handle_destroy(struct ptlrpc_request *req);
 int llog_origin_handle_prev_block(struct ptlrpc_request *req);
 int llog_origin_handle_next_block(struct ptlrpc_request *req);
 int llog_origin_handle_destroy(struct ptlrpc_request *req);
 int llog_origin_handle_prev_block(struct ptlrpc_request *req);
 int llog_origin_handle_next_block(struct ptlrpc_request *req);
index d4134b9..740eae7 100644 (file)
@@ -1963,7 +1963,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
                         RETURN(0);
                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
                         RETURN(0);
-                rc = llog_origin_handle_create(req);
+               rc = llog_origin_handle_open(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
index 8e99fc4..02ef0f2 100644 (file)
@@ -1289,16 +1289,18 @@ static int mdc_changelog_send_thread(void *csdata)
         ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
         if (ctxt == NULL)
                 GOTO(out, rc = -ENOENT);
         ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
         if (ctxt == NULL)
                 GOTO(out, rc = -ENOENT);
-       rc = llog_create(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG);
-        if (rc) {
-                CERROR("llog_create() failed %d\n", rc);
-                GOTO(out, rc);
-        }
+       rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG,
+                      LLOG_OPEN_EXISTS);
+       if (rc) {
+               CERROR("%s: fail to open changelog catalog: rc = %d\n",
+                      cs->cs_obd->obd_name, rc);
+               GOTO(out, rc);
+       }
        rc = llog_init_handle(NULL, llh, LLOG_F_IS_CAT, NULL);
        rc = llog_init_handle(NULL, llh, LLOG_F_IS_CAT, NULL);
-        if (rc) {
-                CERROR("llog_init_handle failed %d\n", rc);
-                GOTO(out, rc);
-        }
+       if (rc) {
+               CERROR("llog_init_handle failed %d\n", rc);
+               GOTO(out, rc);
+       }
 
        rc = llog_cat_process(NULL, llh, changelog_show_cb, cs, 0, 0);
 
 
        rc = llog_cat_process(NULL, llh, changelog_show_cb, cs, 0, 0);
 
@@ -1312,7 +1314,7 @@ static int mdc_changelog_send_thread(void *csdata)
 out:
         cfs_put_file(cs->cs_fp);
         if (llh)
 out:
         cfs_put_file(cs->cs_fp);
         if (llh)
-               llog_cat_put(NULL, llh);
+               llog_cat_close(NULL, llh);
         if (ctxt)
                 llog_ctxt_put(ctxt);
         if (cs->cs_buf)
         if (ctxt)
                 llog_ctxt_put(ctxt);
         if (cs->cs_buf)
index 36d6ef5..d2e5312 100644 (file)
@@ -2021,11 +2021,11 @@ static int mdt_llog_ctxt_unclone(const struct lu_env *env,
 
 static int mdt_llog_create(struct mdt_thread_info *info)
 {
 
 static int mdt_llog_create(struct mdt_thread_info *info)
 {
-        int rc;
+       int rc;
 
 
-        req_capsule_set(info->mti_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
-        rc = llog_origin_handle_create(mdt_info_req(info));
-        return (rc < 0 ? err_serious(rc) : rc);
+       req_capsule_set(info->mti_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
+       rc = llog_origin_handle_open(mdt_info_req(info));
+       return (rc < 0 ? err_serious(rc) : rc);
 }
 
 static int mdt_llog_destroy(struct mdt_thread_info *info)
 }
 
 static int mdt_llog_destroy(struct mdt_thread_info *info)
index 4ed45a0..94fe3bd 100644 (file)
@@ -1357,7 +1357,7 @@ static int mgc_apply_recover_logs(struct obd_device *mgc,
                 if (obd == NULL) {
                         CDEBUG(D_INFO, "mgc %s: cannot find obdname %s\n",
                                mgc->obd_name, obdname);
                 if (obd == NULL) {
                         CDEBUG(D_INFO, "mgc %s: cannot find obdname %s\n",
                                mgc->obd_name, obdname);
-
+                       rc = 0;
                         /* this is a safe race, when the ost is starting up...*/
                         continue;
                 }
                         /* this is a safe race, when the ost is starting up...*/
                         continue;
                 }
@@ -1568,11 +1568,13 @@ static int mgc_llog_is_empty(struct obd_device *obd, struct llog_ctxt *ctxt,
        int                      rc = 0;
 
        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
        int                      rc = 0;
 
        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-       rc = llog_create(NULL, ctxt, &llh, NULL, name);
+       rc = llog_open(NULL, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
        if (rc == 0) {
                llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
                rc = llog_get_size(llh);
                llog_close(NULL, llh);
        if (rc == 0) {
                llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
                rc = llog_get_size(llh);
                llog_close(NULL, llh);
+       } else if (rc == -ENOENT) {
+               rc = 0;
        }
        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
        /* header is record 1 */
        }
        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
        /* header is record 1 */
@@ -1621,39 +1623,37 @@ static int mgc_copy_llog(struct obd_device *obd, struct llog_ctxt *rctxt,
         sprintf(temp_log, "%sT", logname);
 
         /* Make sure there's no old temp log */
         sprintf(temp_log, "%sT", logname);
 
         /* Make sure there's no old temp log */
-       rc = llog_create(NULL, lctxt, &local_llh, NULL, temp_log);
-       if (rc)
+       rc = llog_erase(NULL, lctxt, NULL, temp_log);
+       if (rc < 0 && rc != -ENOENT)
                GOTO(out, rc);
                GOTO(out, rc);
-       rc = llog_init_handle(NULL, local_llh, LLOG_F_IS_PLAIN, NULL);
+
+       /* open local log */
+       rc = llog_open_create(NULL, lctxt, &local_llh, NULL, temp_log);
        if (rc)
                GOTO(out, rc);
        if (rc)
                GOTO(out, rc);
-       rc = llog_destroy(NULL, local_llh);
-        llog_free_handle(local_llh);
-        if (rc)
-                GOTO(out, rc);
 
 
-        /* open local log */
-       rc = llog_create(NULL, lctxt, &local_llh, NULL, temp_log);
-        if (rc)
-                GOTO(out, rc);
-
-        /* set the log header uuid for fun */
-        OBD_ALLOC_PTR(uuid);
-        obd_str2uuid(uuid, logname);
+       /* set the log header uuid for fun */
+       OBD_ALLOC_PTR(uuid);
+       obd_str2uuid(uuid, logname);
        rc = llog_init_handle(NULL, local_llh, LLOG_F_IS_PLAIN, uuid);
        rc = llog_init_handle(NULL, local_llh, LLOG_F_IS_PLAIN, uuid);
-        OBD_FREE_PTR(uuid);
-        if (rc)
-                GOTO(out_closel, rc);
-
-        /* open remote log */
-       rc = llog_create(NULL, rctxt, &remote_llh, NULL, logname);
+       OBD_FREE_PTR(uuid);
        if (rc)
                GOTO(out_closel, rc);
        if (rc)
                GOTO(out_closel, rc);
+
+       /* open remote log */
+       rc = llog_open(NULL, rctxt, &remote_llh, NULL, logname,
+                      LLOG_OPEN_EXISTS);
+       if (rc < 0) {
+               if (rc == -ENOENT)
+                       rc = 0;
+               GOTO(out_closel, rc);
+       }
+
        rc = llog_init_handle(NULL, remote_llh, LLOG_F_IS_PLAIN, NULL);
        rc = llog_init_handle(NULL, remote_llh, LLOG_F_IS_PLAIN, NULL);
-        if (rc)
-                GOTO(out_closer, rc);
+       if (rc)
+               GOTO(out_closer, rc);
 
 
-        /* Copy remote log */
+       /* Copy remote log */
        rc = llog_process(NULL, remote_llh, mgc_copy_handler,
                          (void *)local_llh, NULL);
 
        rc = llog_process(NULL, remote_llh, mgc_copy_handler,
                          (void *)local_llh, NULL);
 
@@ -1668,8 +1668,9 @@ out_closel:
 
         /* We've copied the remote log to the local temp log, now
            replace the old local log with the temp log. */
 
         /* We've copied the remote log to the local temp log, now
            replace the old local log with the temp log. */
-        if (!rc) {
+       if (rc == 0) {
                 struct client_obd *cli = &obd->u.cli;
                 struct client_obd *cli = &obd->u.cli;
+
                 LASSERT(cli);
                 LASSERT(cli->cl_mgc_configs_dir);
                 rc = lustre_rename(cli->cl_mgc_configs_dir, cli->cl_mgc_vfsmnt,
                 LASSERT(cli);
                 LASSERT(cli->cl_mgc_configs_dir);
                 rc = lustre_rename(cli->cl_mgc_configs_dir, cli->cl_mgc_vfsmnt,
index 4c88968..8cd2799 100644 (file)
@@ -907,9 +907,9 @@ int mgs_handle(struct ptlrpc_request *req)
                 break;
 
         case LLOG_ORIGIN_HANDLE_CREATE:
                 break;
 
         case LLOG_ORIGIN_HANDLE_CREATE:
-                DEBUG_REQ(D_MGS, req, "llog_init");
-                req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
-                rc = llog_origin_handle_create(req);
+               DEBUG_REQ(D_MGS, req, "llog_open");
+               req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
+               rc = llog_origin_handle_open(req);
                 if (rc == 0)
                         (void)mgs_handle_fslog_hack(req);
                 break;
                 if (rc == 0)
                         (void)mgs_handle_fslog_hack(req);
                 break;
@@ -929,21 +929,18 @@ int mgs_handle(struct ptlrpc_request *req)
                 DEBUG_REQ(D_MGS, req, "llog close");
                 rc = llog_origin_handle_close(req);
                 break;
                 DEBUG_REQ(D_MGS, req, "llog close");
                 rc = llog_origin_handle_close(req);
                 break;
-        case LLOG_CATINFO:
-               DEBUG_REQ(D_MGS, req, "llog catinfo");
+       default:
                rc = -EOPNOTSUPP;
                rc = -EOPNOTSUPP;
-               break;
-        default:
-                req->rq_status = -ENOTSUPP;
-                rc = ptlrpc_error(req);
-                RETURN(rc);
         }
 
         LASSERT(current->journal_info == NULL);
         }
 
         LASSERT(current->journal_info == NULL);
-
-        if (rc)
-                CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
-
+       if (rc) {
+               DEBUG_REQ(D_MGS, req, "MGS fail to handle opc = %d: rc = %d\n",
+                         opc, rc);
+               req->rq_status = rc;
+               rc = ptlrpc_error(req);
+               RETURN(rc);
+       }
 out:
         target_send_reply(req, rc, fail);
         RETURN(0);
 out:
         target_send_reply(req, rc, fail);
         RETURN(0);
index f41532a..afb1e6b 100644 (file)
@@ -276,19 +276,18 @@ static int mgs_get_fsdb_from_llog(struct obd_device *obd, struct fs_db *fsdb)
         name_create(&logname, fsdb->fsdb_name, "-client");
         cfs_mutex_lock(&fsdb->fsdb_mutex);
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         name_create(&logname, fsdb->fsdb_name, "-client");
         cfs_mutex_lock(&fsdb->fsdb_mutex);
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-       rc = llog_create(NULL, ctxt, &loghandle, NULL, logname);
+       rc = llog_open_create(NULL, ctxt, &loghandle, NULL, logname);
        if (rc)
                GOTO(out_pop, rc);
 
        rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
        if (rc)
                GOTO(out_pop, rc);
 
        rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
-        if (rc)
-                GOTO(out_close, rc);
+       if (rc)
+               GOTO(out_close, rc);
 
 
-        if (llog_get_size(loghandle) <= 1)
-                cfs_set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
+       if (llog_get_size(loghandle) <= 1)
+               cfs_set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
 
 
-       rc = llog_process(NULL, loghandle, mgs_fsdb_handler, (void *) &d,
-                         NULL);
+       rc = llog_process(NULL, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
        CDEBUG(D_INFO, "get_db = %d\n", rc);
 out_close:
        rc2 = llog_close(NULL, loghandle);
        CDEBUG(D_INFO, "get_db = %d\n", rc);
 out_close:
        rc2 = llog_close(NULL, loghandle);
@@ -682,13 +681,16 @@ static int mgs_modify(struct obd_device *obd, struct fs_db *fsdb,
         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
                flags);
 
         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
                flags);
 
-        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-
         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
         LASSERT(ctxt != NULL);
         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
         LASSERT(ctxt != NULL);
-       rc = llog_create(NULL, ctxt, &loghandle, NULL, logname);
-        if (rc)
-                GOTO(out_pop, rc);
+       push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+       rc = llog_open(NULL, ctxt, &loghandle, NULL, logname,
+                      LLOG_OPEN_EXISTS);
+       if (rc < 0) {
+               if (rc == -ENOENT)
+                       rc = 0;
+               GOTO(out_pop, rc);
+       }
 
        rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
         if (rc)
 
        rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
         if (rc)
@@ -787,7 +789,7 @@ static int record_base(struct obd_device *obd, struct llog_handle *llh,
                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
                        cmd, s1, s2, s3, s4);
         }
                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
                        cmd, s1, s2, s3, s4);
         }
-        return(rc);
+       return rc;
 }
 
 
 }
 
 
@@ -928,20 +930,21 @@ static int record_start_log(struct obd_device *obd,
                 GOTO(out, rc = -ENODEV);
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 GOTO(out, rc = -ENODEV);
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-       rc = llog_create(NULL, ctxt, llh, NULL, name);
-       if (rc == 0)
-               llog_init_handle(NULL, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
-       else
+       rc = llog_open_create(NULL, ctxt, llh, NULL, name);
+       if (rc)
+               GOTO(out_ctxt, rc);
+       rc = llog_init_handle(NULL, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
+       if (rc) {
+               llog_close(NULL, *llh);
                *llh = NULL;
                *llh = NULL;
-
-        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        llog_ctxt_put(ctxt);
-
+       }
+out_ctxt:
+       pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+       llog_ctxt_put(ctxt);
 out:
 out:
-        if (rc) {
-                CERROR("Can't start log %s: %d\n", name, rc);
-        }
-        RETURN(rc);
+       if (rc)
+               CERROR("Can't start log %s: %d\n", name, rc);
+       RETURN(rc);
 }
 
 static int record_end_log(struct obd_device *obd, struct llog_handle **llh)
 }
 
 static int record_end_log(struct obd_device *obd, struct llog_handle **llh)
@@ -968,16 +971,25 @@ static int mgs_log_is_empty(struct obd_device *obd, char *name)
         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
         LASSERT(ctxt != NULL);
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
         LASSERT(ctxt != NULL);
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        rc = llog_create(NULL, ctxt, &llh, NULL, name);
-        if (rc == 0) {
-               llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
-               rc = llog_get_size(llh);
-               llog_close(NULL, llh);
-        }
-        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        llog_ctxt_put(ctxt);
-        /* header is record 1 */
-        return(rc <= 1);
+       rc = llog_open(NULL, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
+       if (rc < 0) {
+               if (rc == -ENOENT)
+                       rc = 0;
+               GOTO(out_ctxt, rc);
+       }
+
+       llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
+       if (rc)
+               GOTO(out_close, rc);
+       rc = llog_get_size(llh);
+
+out_close:
+       llog_close(NULL, llh);
+out_ctxt:
+       pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+       llog_ctxt_put(ctxt);
+       /* header is record 1 */
+       return (rc <= 1);
 }
 
 /******************** config "macros" *********************/
 }
 
 /******************** config "macros" *********************/
@@ -1256,7 +1268,8 @@ static int mgs_steal_llog_for_mdt_from_client(struct obd_device *obd,
         struct lvfs_run_ctxt saved;
         struct mgs_target_info *tmti;
         struct llog_ctxt *ctxt;
         struct lvfs_run_ctxt saved;
         struct mgs_target_info *tmti;
         struct llog_ctxt *ctxt;
-        int rc, rc2;
+       int rc;
+
         ENTRY;
 
         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
         ENTRY;
 
         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
@@ -1264,33 +1277,36 @@ static int mgs_steal_llog_for_mdt_from_client(struct obd_device *obd,
 
         OBD_ALLOC_PTR(tmti);
         if (tmti == NULL)
 
         OBD_ALLOC_PTR(tmti);
         if (tmti == NULL)
-                RETURN(-ENOMEM);
+               GOTO(out_ctxt, rc = -ENOMEM);
 
 
-        comp->comp_tmti = tmti;
-        comp->comp_obd = obd;
+       comp->comp_tmti = tmti;
+       comp->comp_obd = obd;
 
 
-        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+       push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
 
-       rc = llog_create(NULL, ctxt, &loghandle, NULL, client_name);
-        if (rc)
-                GOTO(out_pop, rc);
+       rc = llog_open(NULL, ctxt, &loghandle, NULL, client_name,
+                      LLOG_OPEN_EXISTS);
+       if (rc < 0) {
+               if (rc == -ENOENT)
+                       rc = 0;
+               GOTO(out_pop, rc);
+       }
 
        rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
 
        rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
-        if (rc)
-                GOTO(out_close, rc);
+       if (rc)
+               GOTO(out_close, rc);
 
        rc = llog_process(NULL, loghandle, mgs_steal_llog_handler,
                          (void *)comp, NULL);
 
        rc = llog_process(NULL, loghandle, mgs_steal_llog_handler,
                          (void *)comp, NULL);
-        CDEBUG(D_MGS, "steal llog re = %d\n", rc);
+       CDEBUG(D_MGS, "steal llog re = %d\n", rc);
 out_close:
 out_close:
-       rc2 = llog_close(NULL, loghandle);
-        if (!rc)
-                rc = rc2;
+       llog_close(NULL, loghandle);
 out_pop:
 out_pop:
-        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        OBD_FREE_PTR(tmti);
-        llog_ctxt_put(ctxt);
-        RETURN(rc);
+       pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+       OBD_FREE_PTR(tmti);
+out_ctxt:
+       llog_ctxt_put(ctxt);
+       RETURN(rc);
 }
 
 /* lmv is the second thing for client logs */
 }
 
 /* lmv is the second thing for client logs */
@@ -2445,9 +2461,13 @@ int mgs_get_fsdb_srpc_from_llog(struct obd_device *obd,
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
-       rc = llog_create(NULL, ctxt, &llh, NULL, logname);
-        if (rc)
-                GOTO(out_pop, rc);
+       rc = llog_open(NULL, ctxt, &llh, NULL, logname,
+                      LLOG_OPEN_EXISTS);
+       if (rc < 0) {
+               if (rc == -ENOENT)
+                       rc = 0;
+               GOTO(out_pop, rc);
+       }
 
        rc = llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
         if (rc)
 
        rc = llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
         if (rc)
@@ -2459,7 +2479,7 @@ int mgs_get_fsdb_srpc_from_llog(struct obd_device *obd,
         msrd.msrd_fsdb = fsdb;
         msrd.msrd_skip = 0;
 
         msrd.msrd_fsdb = fsdb;
         msrd.msrd_skip = 0;
 
-       rc = llog_process(NULL, llh, mgs_srpc_read_handler, (void *) &msrd,
+       rc = llog_process(NULL, llh, mgs_srpc_read_handler, (void *)&msrd,
                          NULL);
 
 out_close:
                          NULL);
 
 out_close:
@@ -2958,28 +2978,30 @@ int mgs_upgrade_sv_14(struct obd_device *obd, struct mgs_target_info *mti,
 
 int mgs_erase_log(struct obd_device *obd, char *name)
 {
 
 int mgs_erase_log(struct obd_device *obd, char *name)
 {
-        struct lvfs_run_ctxt saved;
-        struct llog_ctxt *ctxt;
-        struct llog_handle *llh;
-        int rc = 0;
+       struct lvfs_run_ctxt     saved;
+       struct llog_ctxt        *ctxt;
+       int                      rc = 0;
 
 
-        ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
-        LASSERT(ctxt != NULL);
-
-        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-       rc = llog_create(NULL, ctxt, &llh, NULL, name);
-       if (rc == 0) {
-               llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
-               rc = llog_destroy(NULL, llh);
-                llog_free_handle(llh);
-        }
-        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        llog_ctxt_put(ctxt);
+       ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+       if (ctxt == NULL) {
+               CERROR("%s: MGS config context doesn't exist\n",
+                      obd->obd_name);
+               rc = -ENODEV;
+       } else {
+               push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+               rc = llog_erase(NULL, ctxt, NULL, name);
+               /* llog may not exist */
+               if (rc == -ENOENT)
+                       rc = 0;
+               pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+               llog_ctxt_put(ctxt);
+       }
 
 
-        if (rc)
-                CERROR("failed to clear log %s: %d\n", name, rc);
+       if (rc)
+               CERROR("%s: failed to clear log %s: %d\n", obd->obd_name,
+                      name, rc);
 
 
-        return(rc);
+       return rc;
 }
 
 /* erase all logs for the given fs */
 }
 
 /* erase all logs for the given fs */
index 4411e0c..ee4d1f4 100644 (file)
 #include <lustre_log.h>
 #include "llog_internal.h"
 
 #include <lustre_log.h>
 #include "llog_internal.h"
 
-/* Allocate a new log or catalog handle */
+/*
+ * Allocate a new log or catalog handle
+ * Used inside llog_open().
+ */
 struct llog_handle *llog_alloc_handle(void)
 {
 struct llog_handle *llog_alloc_handle(void)
 {
-        struct llog_handle *loghandle;
-        ENTRY;
+       struct llog_handle *loghandle;
 
        OBD_ALLOC_PTR(loghandle);
 
        OBD_ALLOC_PTR(loghandle);
-        if (loghandle == NULL)
-                RETURN(ERR_PTR(-ENOMEM));
+       if (loghandle == NULL)
+               return ERR_PTR(-ENOMEM);
 
 
-        cfs_init_rwsem(&loghandle->lgh_lock);
+       cfs_init_rwsem(&loghandle->lgh_lock);
        cfs_spin_lock_init(&loghandle->lgh_hdr_lock);
        CFS_INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
 
        cfs_spin_lock_init(&loghandle->lgh_hdr_lock);
        CFS_INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
 
-        RETURN(loghandle);
+       return loghandle;
 }
 }
-EXPORT_SYMBOL(llog_alloc_handle);
-
 
 
+/*
+ * Free llog handle and header data if exists. Used in llog_close() only
+ */
 void llog_free_handle(struct llog_handle *loghandle)
 {
 void llog_free_handle(struct llog_handle *loghandle)
 {
-        if (!loghandle)
-                return;
-
-        if (!loghandle->lgh_hdr)
-                goto out;
-        if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
-                cfs_list_del_init(&loghandle->u.phd.phd_entry);
-        if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
-                LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
+       if (!loghandle)
+               return;
+
+       if (!loghandle->lgh_hdr)
+               goto out;
+       if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
+               cfs_list_del_init(&loghandle->u.phd.phd_entry);
+       if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
+               LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
        LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
        OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
 
 out:
        OBD_FREE_PTR(loghandle);
 }
        LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
        OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
 
 out:
        OBD_FREE_PTR(loghandle);
 }
-EXPORT_SYMBOL(llog_free_handle);
 
 /* returns negative on error; 0 if success; 1 if success & log destroyed */
 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
 
 /* returns negative on error; 0 if success; 1 if success & log destroyed */
 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
@@ -154,58 +156,99 @@ out_err:
 }
 EXPORT_SYMBOL(llog_cancel_rec);
 
 }
 EXPORT_SYMBOL(llog_cancel_rec);
 
+static int llog_read_header(const struct lu_env *env,
+                           struct llog_handle *handle,
+                           struct obd_uuid *uuid)
+{
+       struct llog_operations *lop;
+       int rc;
+
+       rc = llog_handle2ops(handle, &lop);
+       if (rc)
+               RETURN(rc);
+
+       if (lop->lop_read_header == NULL)
+               RETURN(-EOPNOTSUPP);
+
+       rc = lop->lop_read_header(env, handle);
+       if (rc == LLOG_EEMPTY) {
+               struct llog_log_hdr *llh = handle->lgh_hdr;
+
+               handle->lgh_last_idx = 0; /* header is record with index 0 */
+               llh->llh_count = 1;         /* for the header record */
+               llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
+               llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
+               llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
+               llh->llh_timestamp = cfs_time_current_sec();
+               if (uuid)
+                       memcpy(&llh->llh_tgtuuid, uuid,
+                              sizeof(llh->llh_tgtuuid));
+               llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
+               ext2_set_bit(0, llh->llh_bitmap);
+               rc = 0;
+       }
+       return rc;
+}
+
 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
                     int flags, struct obd_uuid *uuid)
 {
 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
                     int flags, struct obd_uuid *uuid)
 {
-        int rc;
-        struct llog_log_hdr *llh;
-        ENTRY;
-        LASSERT(handle->lgh_hdr == NULL);
+       struct llog_log_hdr     *llh;
+       int                      rc;
 
 
-       OBD_ALLOC_PTR(llh);
-        if (llh == NULL)
-                RETURN(-ENOMEM);
-        handle->lgh_hdr = llh;
-        /* first assign flags to use llog_client_ops */
-        llh->llh_flags = flags;
-       rc = llog_read_header(env, handle);
-        if (rc == 0) {
-                flags = llh->llh_flags;
-                if (uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid)) {
-                        CERROR("uuid mismatch: %s/%s\n", (char *)uuid->uuid,
-                               (char *)llh->llh_tgtuuid.uuid);
-                        rc = -EEXIST;
-                }
-                GOTO(out, rc);
-        } else if (rc != LLOG_EEMPTY || !flags) {
-                /* set a pesudo flag for initialization */
-                flags = LLOG_F_IS_CAT;
-                GOTO(out, rc);
-        }
-        rc = 0;
-
-        handle->lgh_last_idx = 0; /* header is record with index 0 */
-        llh->llh_count = 1;         /* for the header record */
-        llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
-        llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
-        llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
-        llh->llh_timestamp = cfs_time_current_sec();
-        if (uuid)
-                memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
-        llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
-        ext2_set_bit(0, llh->llh_bitmap);
+       ENTRY;
+       LASSERT(handle->lgh_hdr == NULL);
 
 
-out:
-        if (flags & LLOG_F_IS_CAT) {
+       OBD_ALLOC_PTR(llh);
+       if (llh == NULL)
+               RETURN(-ENOMEM);
+       handle->lgh_hdr = llh;
+       /* first assign flags to use llog_client_ops */
+       llh->llh_flags = flags;
+       rc = llog_read_header(env, handle, uuid);
+       if (rc == 0) {
+               if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
+                             flags & LLOG_F_IS_CAT) ||
+                            (llh->llh_flags & LLOG_F_IS_CAT &&
+                             flags & LLOG_F_IS_PLAIN))) {
+                       CERROR("%s: llog type is %s but initializing %s\n",
+                              handle->lgh_ctxt->loc_obd->obd_name,
+                              llh->llh_flags & LLOG_F_IS_CAT ?
+                              "catalog" : "plain",
+                              flags & LLOG_F_IS_CAT ? "catalog" : "plain");
+                       GOTO(out, rc = -EINVAL);
+               } else if (llh->llh_flags &
+                          (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
+                       /*
+                        * it is possible to open llog without specifying llog
+                        * type so it is taken from llh_flags
+                        */
+                       flags = llh->llh_flags;
+               } else {
+                       /* for some reason the llh_flags has no type set */
+                       CERROR("llog type is not specified!\n");
+                       GOTO(out, rc = -EINVAL);
+               }
+               if (unlikely(uuid &&
+                            !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
+                       CERROR("%s: llog uuid mismatch: %s/%s\n",
+                              handle->lgh_ctxt->loc_obd->obd_name,
+                              (char *)uuid->uuid,
+                              (char *)llh->llh_tgtuuid.uuid);
+                       GOTO(out, rc = -EEXIST);
+               }
+       }
+       if (flags & LLOG_F_IS_CAT) {
                LASSERT(cfs_list_empty(&handle->u.chd.chd_head));
                CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
                llh->llh_size = sizeof(struct llog_logid_rec);
        } else if (!(flags & LLOG_F_IS_PLAIN)) {
                LASSERT(cfs_list_empty(&handle->u.chd.chd_head));
                CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
                llh->llh_size = sizeof(struct llog_logid_rec);
        } else if (!(flags & LLOG_F_IS_PLAIN)) {
-               CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
+               CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
+                      handle->lgh_ctxt->loc_obd->obd_name,
                       flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
                rc = -EINVAL;
        }
                       flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
                rc = -EINVAL;
        }
-
+out:
        if (rc) {
                OBD_FREE_PTR(llh);
                handle->lgh_hdr = NULL;
        if (rc) {
                OBD_FREE_PTR(llh);
                handle->lgh_hdr = NULL;
@@ -214,24 +257,6 @@ out:
 }
 EXPORT_SYMBOL(llog_init_handle);
 
 }
 EXPORT_SYMBOL(llog_init_handle);
 
-int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
-{
-        struct llog_operations *lop;
-        int rc;
-        ENTRY;
-
-        rc = llog_handle2ops(loghandle, &lop);
-        if (rc)
-                GOTO(out, rc);
-        if (lop->lop_close == NULL)
-                GOTO(out, -EOPNOTSUPP);
-       rc = lop->lop_close(env, loghandle);
- out:
-        llog_free_handle(loghandle);
-        RETURN(rc);
-}
-EXPORT_SYMBOL(llog_close);
-
 static int llog_process_thread(void *arg)
 {
        struct llog_process_info        *lpi = arg;
 static int llog_process_thread(void *arg)
 {
        struct llog_process_info        *lpi = arg;
@@ -441,7 +466,7 @@ int llog_process_or_fork(const struct lu_env *env,
 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
                 llog_cb_t cb, void *data, void *catdata)
 {
 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
                 llog_cb_t cb, void *data, void *catdata)
 {
-       return llog_process_or_fork(env, loghandle, cb, data, catdata, false);
+       return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
 }
 EXPORT_SYMBOL(llog_process);
 
 }
 EXPORT_SYMBOL(llog_process);
 
@@ -542,3 +567,137 @@ out:
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_reverse_process);
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_reverse_process);
+
+/**
+ * Helper function to open llog or create it if doesn't exist.
+ * It hides all transaction handling from caller.
+ */
+int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
+                    struct llog_handle **res, struct llog_logid *logid,
+                    char *name)
+{
+       struct thandle  *th;
+       int              rc;
+
+       ENTRY;
+
+       rc = llog_open(env, ctxt, res, logid, name, LLOG_OPEN_NEW);
+       if (rc)
+               RETURN(rc);
+
+       if (llog_exist(*res))
+               RETURN(0);
+
+       if ((*res)->lgh_obj != NULL) {
+               struct dt_device *d;
+
+               d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev);
+
+               th = dt_trans_create(env, d);
+               if (IS_ERR(th))
+                       GOTO(out, rc = PTR_ERR(th));
+
+               rc = llog_declare_create(env, *res, th);
+               if (rc == 0) {
+                       rc = dt_trans_start_local(env, d, th);
+                       if (rc == 0)
+                               rc = llog_create(env, *res, th);
+               }
+               dt_trans_stop(env, d, th);
+       } else {
+               /* lvfs compat code */
+               LASSERT((*res)->lgh_file == NULL);
+               rc = llog_create(env, *res, NULL);
+       }
+out:
+       if (rc)
+               llog_close(env, *res);
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_open_create);
+
+/**
+ * Helper function to delete existent llog.
+ */
+int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt,
+              struct llog_logid *logid, char *name)
+{
+       struct llog_handle      *handle;
+       int                      rc = 0, rc2;
+
+       ENTRY;
+
+       /* nothing to erase */
+       if (name == NULL && logid == NULL)
+               RETURN(0);
+
+       rc = llog_open(env, ctxt, &handle, logid, name, LLOG_OPEN_EXISTS);
+       if (rc < 0)
+               RETURN(rc);
+
+       rc = llog_init_handle(env, handle, LLOG_F_IS_PLAIN, NULL);
+       if (rc == 0)
+               rc = llog_destroy(env, handle);
+
+       rc2 = llog_close(env, handle);
+       if (rc == 0)
+               rc = rc2;
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_erase);
+
+int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
+             struct llog_handle **lgh, struct llog_logid *logid,
+             char *name, enum llog_open_param open_param)
+{
+       int      raised;
+       int      rc;
+
+       ENTRY;
+
+       LASSERT(ctxt);
+       LASSERT(ctxt->loc_logops);
+
+       if (ctxt->loc_logops->lop_open == NULL) {
+               *lgh = NULL;
+               RETURN(-EOPNOTSUPP);
+       }
+
+       *lgh = llog_alloc_handle();
+       if (*lgh == NULL)
+               RETURN(-ENOMEM);
+       (*lgh)->lgh_ctxt = ctxt;
+       (*lgh)->lgh_logops = ctxt->loc_logops;
+
+       raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
+       if (!raised)
+               cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+       rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
+       if (!raised)
+               cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+       if (rc) {
+               llog_free_handle(*lgh);
+               *lgh = NULL;
+       }
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_open);
+
+int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
+{
+       struct llog_operations  *lop;
+       int                      rc;
+
+       ENTRY;
+
+       rc = llog_handle2ops(loghandle, &lop);
+       if (rc)
+               GOTO(out, rc);
+       if (lop->lop_close == NULL)
+               GOTO(out, -EOPNOTSUPP);
+       rc = lop->lop_close(env, loghandle);
+out:
+       llog_free_handle(loghandle);
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_close);
index 3bbb801..50172ff 100644 (file)
@@ -49,7 +49,6 @@
 #endif
 
 #include <obd_class.h>
 #endif
 
 #include <obd_class.h>
-#include <lustre_log.h>
 
 #include "llog_internal.h"
 
 
 #include "llog_internal.h"
 
@@ -81,7 +80,8 @@ static struct llog_handle *llog_cat_new_log(const struct lu_env *env,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
                 RETURN(ERR_PTR(-ENOSPC));
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
                 RETURN(ERR_PTR(-ENOSPC));
 
-       rc = llog_create(env, cathandle->lgh_ctxt, &loghandle, NULL, NULL);
+       rc = llog_open_create(env, cathandle->lgh_ctxt, &loghandle, NULL,
+                             NULL);
        if (rc)
                RETURN(ERR_PTR(rc));
 
        if (rc)
                RETURN(ERR_PTR(rc));
 
@@ -147,53 +147,58 @@ out_destroy:
 int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                       struct llog_handle **res, struct llog_logid *logid)
 {
 int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                       struct llog_handle **res, struct llog_logid *logid)
 {
-        struct llog_handle *loghandle;
-        int rc = 0;
-        ENTRY;
-
-        if (cathandle == NULL)
-                RETURN(-EBADF);
+       struct llog_handle      *loghandle;
+       int                      rc = 0;
 
 
-        cfs_list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
-                                u.phd.phd_entry) {
-                struct llog_logid *cgl = &loghandle->lgh_id;
+       ENTRY;
 
 
-                if (cgl->lgl_oid == logid->lgl_oid) {
-                        if (cgl->lgl_ogen != logid->lgl_ogen) {
-                                CERROR("log "LPX64" generation %x != %x\n",
-                                       logid->lgl_oid, cgl->lgl_ogen,
-                                       logid->lgl_ogen);
-                                continue;
-                        }
-                        loghandle->u.phd.phd_cat_handle = cathandle;
-                        GOTO(out, rc = 0);
-                }
-        }
+       if (cathandle == NULL)
+               RETURN(-EBADF);
+
+       cfs_list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
+                               u.phd.phd_entry) {
+               struct llog_logid *cgl = &loghandle->lgh_id;
+
+               if (cgl->lgl_oid == logid->lgl_oid) {
+                       if (cgl->lgl_ogen != logid->lgl_ogen) {
+                               CERROR("%s: log "LPX64" generation %x != %x\n",
+                                      loghandle->lgh_ctxt->loc_obd->obd_name,
+                                      logid->lgl_oid, cgl->lgl_ogen,
+                                      logid->lgl_ogen);
+                               continue;
+                       }
+                       loghandle->u.phd.phd_cat_handle = cathandle;
+                       GOTO(out, rc = 0);
+               }
+       }
 
 
-       rc = llog_create(env, cathandle->lgh_ctxt, &loghandle, logid, NULL);
-       if (rc) {
-               CERROR("error opening log id "LPX64":%x: rc %d\n",
+       rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, logid, NULL,
+                      LLOG_OPEN_EXISTS);
+       if (rc < 0) {
+               CERROR("%s: error opening log id "LPX64":%x: rc = %d\n",
+                      cathandle->lgh_ctxt->loc_obd->obd_name,
                       logid->lgl_oid, logid->lgl_ogen, rc);
                       logid->lgl_oid, logid->lgl_ogen, rc);
-       } else {
-               rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
-                if (!rc) {
-                        cfs_list_add(&loghandle->u.phd.phd_entry,
-                                     &cathandle->u.chd.chd_head);
-                }
-        }
-        if (!rc) {
-                loghandle->u.phd.phd_cat_handle = cathandle;
-                loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
-                loghandle->u.phd.phd_cookie.lgc_index =
-                        loghandle->lgh_hdr->llh_cat_idx;
-        }
+               GOTO(out, rc);
+       }
 
 
+       rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
+       if (rc < 0) {
+               llog_close(env, loghandle);
+               GOTO(out, rc);
+       }
+
+       cfs_list_add(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
+
+       loghandle->u.phd.phd_cat_handle = cathandle;
+       loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
+       loghandle->u.phd.phd_cookie.lgc_index = loghandle->lgh_hdr->llh_cat_idx;
+       EXIT;
 out:
 out:
-        *res = loghandle;
-        RETURN(rc);
+       *res = loghandle;
+       return rc;
 }
 
 }
 
-int llog_cat_put(const struct lu_env *env, struct llog_handle *cathandle)
+int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
 {
        struct llog_handle      *loghandle, *n;
        int                      rc;
 {
        struct llog_handle      *loghandle, *n;
        int                      rc;
@@ -202,14 +207,42 @@ int llog_cat_put(const struct lu_env *env, struct llog_handle *cathandle)
 
        cfs_list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
                                     u.phd.phd_entry) {
 
        cfs_list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
                                     u.phd.phd_entry) {
-               int err = llog_close(env, loghandle);
-               if (err)
-                       CERROR("error closing loghandle\n");
+               struct llog_log_hdr     *llh = loghandle->lgh_hdr;
+               int                      index;
+
+               /* unlink open-not-created llogs */
+               cfs_list_del_init(&loghandle->u.phd.phd_entry);
+               llh = loghandle->lgh_hdr;
+               if (loghandle->lgh_obj != NULL && llh != NULL &&
+                   (llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
+                   (llh->llh_count == 1)) {
+                       rc = llog_destroy(env, loghandle);
+                       if (rc)
+                               CERROR("%s: failure destroying log during "
+                                      "cleanup: rc = %d\n",
+                                      loghandle->lgh_ctxt->loc_obd->obd_name,
+                                      rc);
+
+                       index = loghandle->u.phd.phd_cookie.lgc_index;
+
+                       LASSERT(index);
+                       llog_cat_set_first_idx(cathandle, index);
+                       rc = llog_cancel_rec(env, cathandle, index);
+                       if (rc == 0)
+                               CDEBUG(D_RPCTRACE,
+                                      "cancel plain log at index %u of "
+                                      "catalog "LPX64"\n",
+                                      index, cathandle->lgh_id.lgl_oid);
+               }
+               llog_close(env, loghandle);
        }
        }
+       /* if handle was stored in ctxt, remove it too */
+       if (cathandle->lgh_ctxt->loc_handle == cathandle)
+               cathandle->lgh_ctxt->loc_handle = NULL;
        rc = llog_close(env, cathandle);
        RETURN(rc);
 }
        rc = llog_close(env, cathandle);
        RETURN(rc);
 }
-EXPORT_SYMBOL(llog_cat_put);
+EXPORT_SYMBOL(llog_cat_close);
 
 /**
  * lockdep markers for nested struct llog_handle::lgh_lock locking.
 
 /**
  * lockdep markers for nested struct llog_handle::lgh_lock locking.
@@ -331,13 +364,14 @@ int llog_cat_cancel_records(const struct lu_env *env,
                            struct llog_handle *cathandle, int count,
                            struct llog_cookie *cookies)
 {
                            struct llog_handle *cathandle, int count,
                            struct llog_cookie *cookies)
 {
-        int i, index, rc = 0;
-        ENTRY;
+       int i, index, rc = 0;
 
 
-        cfs_down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
-        for (i = 0; i < count; i++, cookies++) {
-                struct llog_handle *loghandle;
-                struct llog_logid *lgl = &cookies->lgc_lgl;
+       ENTRY;
+
+       cfs_down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
+       for (i = 0; i < count; i++, cookies++) {
+               struct llog_handle      *loghandle;
+               struct llog_logid       *lgl = &cookies->lgc_lgl;
 
                rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
                if (rc) {
 
                rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
                if (rc) {
@@ -345,18 +379,18 @@ int llog_cat_cancel_records(const struct lu_env *env,
                        break;
                }
 
                        break;
                }
 
-                cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
+               cfs_down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
                rc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
                rc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
-                cfs_up_write(&loghandle->lgh_lock);
+               cfs_up_write(&loghandle->lgh_lock);
 
 
-                if (rc == 1) {          /* log has been destroyed */
-                        index = loghandle->u.phd.phd_cookie.lgc_index;
-                        if (cathandle->u.chd.chd_current_log == loghandle)
-                                cathandle->u.chd.chd_current_log = NULL;
-                        llog_free_handle(loghandle);
+               if (rc == 1) {          /* log has been destroyed */
+                       index = loghandle->u.phd.phd_cookie.lgc_index;
+                       if (cathandle->u.chd.chd_current_log == loghandle)
+                               cathandle->u.chd.chd_current_log = NULL;
+                       llog_close(env, loghandle);
 
 
-                        LASSERT(index);
-                        llog_cat_set_first_idx(cathandle, index);
+                       LASSERT(index);
+                       llog_cat_set_first_idx(cathandle, index);
                        rc = llog_cancel_rec(env, cathandle, index);
                         if (rc == 0)
                                 CDEBUG(D_RPCTRACE,"cancel plain log at index %u"
                        rc = llog_cancel_rec(env, cathandle, index);
                         if (rc == 0)
                                 CDEBUG(D_RPCTRACE,"cancel plain log at index %u"
@@ -403,11 +437,13 @@ int llog_cat_process_cb(const struct lu_env *env, struct llog_handle *cat_llh,
 
                 cd.lpcd_first_idx = d->lpd_startidx;
                 cd.lpcd_last_idx = 0;
 
                 cd.lpcd_first_idx = d->lpd_startidx;
                 cd.lpcd_last_idx = 0;
-               rc = llog_process(env, llh, d->lpd_cb, d->lpd_data, &cd);
+               rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
+                                         &cd, false);
                 /* Continue processing the next log from idx 0 */
                 d->lpd_startidx = 0;
         } else {
                 /* Continue processing the next log from idx 0 */
                 d->lpd_startidx = 0;
         } else {
-               rc = llog_process(env, llh, d->lpd_cb, d->lpd_data, NULL);
+               rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
+                                         NULL, false);
         }
 
         RETURN(rc);
         }
 
         RETURN(rc);
@@ -484,41 +520,47 @@ int llog_cat_process_thread(void *data)
        LASSERT(lgi);
 
        lgi->lgi_logid = *(struct llog_logid *)(args->lpca_arg);
        LASSERT(lgi);
 
        lgi->lgi_logid = *(struct llog_logid *)(args->lpca_arg);
-       rc = llog_create(&env, ctxt, &llh, &lgi->lgi_logid, NULL);
-        if (rc) {
-                CERROR("llog_create() failed %d\n", rc);
+       rc = llog_open(&env, ctxt, &llh, &lgi->lgi_logid, NULL,
+                      LLOG_OPEN_EXISTS);
+       if (rc) {
+               CERROR("%s: cannot open llog "LPX64":%x: rc = %d\n",
+                      ctxt->loc_obd->obd_name, lgi->lgi_logid.lgl_oid,
+                      lgi->lgi_logid.lgl_ogen, rc);
                GOTO(out_env, rc);
                GOTO(out_env, rc);
-        }
+       }
        rc = llog_init_handle(&env, llh, LLOG_F_IS_CAT, NULL);
        rc = llog_init_handle(&env, llh, LLOG_F_IS_CAT, NULL);
-        if (rc) {
-                CERROR("llog_init_handle failed %d\n", rc);
-                GOTO(release_llh, rc);
-        }
+       if (rc) {
+               CERROR("%s: llog_init_handle failed: rc = %d\n",
+                      llh->lgh_ctxt->loc_obd->obd_name, rc);
+               GOTO(release_llh, rc);
+       }
 
 
-        if (cb) {
+       if (cb) {
                rc = llog_cat_process(&env, llh, cb, NULL, 0, 0);
                if (rc != LLOG_PROC_BREAK && rc != 0)
                rc = llog_cat_process(&env, llh, cb, NULL, 0, 0);
                if (rc != LLOG_PROC_BREAK && rc != 0)
-                       CERROR("llog_cat_process() failed %d\n", rc);
+                       CERROR("%s: llog_cat_process() failed: rc = %d\n",
+                              llh->lgh_ctxt->loc_obd->obd_name, rc);
                cb(&env, llh, NULL, NULL);
                cb(&env, llh, NULL, NULL);
-        } else {
-                CWARN("No callback function for recovery\n");
-        }
+       } else {
+               CWARN("No callback function for recovery\n");
+       }
 
 
-        /*
-         * Make sure that all cached data is sent.
-         */
+       /*
+        * Make sure that all cached data is sent.
+        */
        llog_sync(ctxt, NULL, 0);
        llog_sync(ctxt, NULL, 0);
-        GOTO(release_llh, rc);
+       GOTO(release_llh, rc);
 release_llh:
 release_llh:
-       rc = llog_cat_put(&env, llh);
-        if (rc)
-                CERROR("llog_cat_put() failed %d\n", rc);
+       rc = llog_cat_close(&env, llh);
+       if (rc)
+               CERROR("%s: llog_cat_close() failed: rc = %d\n",
+                      llh->lgh_ctxt->loc_obd->obd_name, rc);
 out_env:
        lu_env_fini(&env);
 out:
 out_env:
        lu_env_fini(&env);
 out:
-        llog_ctxt_put(ctxt);
-        OBD_FREE_PTR(args);
-        return rc;
+       llog_ctxt_put(ctxt);
+       OBD_FREE_PTR(args);
+       return rc;
 }
 EXPORT_SYMBOL(llog_cat_process_thread);
 #endif
 }
 EXPORT_SYMBOL(llog_cat_process_thread);
 #endif
@@ -665,7 +707,7 @@ int cat_cancel_cb(const struct lu_env *env, struct llog_handle *cathandle,
                               loghandle->lgh_ctxt->loc_obd->obd_name, rc);
 
                index = loghandle->u.phd.phd_cookie.lgc_index;
                               loghandle->lgh_ctxt->loc_obd->obd_name, rc);
 
                index = loghandle->u.phd.phd_cookie.lgc_index;
-               llog_free_handle(loghandle);
+               llog_close(env, loghandle);
 
 cat_cleanup:
                LASSERT(index);
 
 cat_cleanup:
                LASSERT(index);
index afbb34a..1bd8837 100644 (file)
@@ -252,31 +252,31 @@ static int llog_remove_log(const struct lu_env *env, struct llog_handle *cat,
         index = log->u.phd.phd_cookie.lgc_index;
         LASSERT(index);
        rc = llog_destroy(env, log);
         index = log->u.phd.phd_cookie.lgc_index;
         LASSERT(index);
        rc = llog_destroy(env, log);
-        if (rc) {
-                CDEBUG(D_IOCTL, "cannot destroy log\n");
-                GOTO(out, rc);
-        }
-        llog_cat_set_first_idx(cat, index);
+       if (rc) {
+               CDEBUG(D_IOCTL, "cannot destroy log\n");
+               GOTO(out, rc);
+       }
+       llog_cat_set_first_idx(cat, index);
        rc = llog_cancel_rec(env, cat, index);
 out:
        rc = llog_cancel_rec(env, cat, index);
 out:
-        llog_free_handle(log);
-        cfs_up_write(&cat->lgh_lock);
-        RETURN(rc);
+       cfs_up_write(&cat->lgh_lock);
+       llog_close(env, log);
+       RETURN(rc);
 
 }
 
 static int llog_delete_cb(const struct lu_env *env, struct llog_handle *handle,
                          struct llog_rec_hdr *rec, void *data)
 {
 
 }
 
 static int llog_delete_cb(const struct lu_env *env, struct llog_handle *handle,
                          struct llog_rec_hdr *rec, void *data)
 {
-        struct  llog_logid_rec *lir = (struct llog_logid_rec*)rec;
-        int     rc;
+       struct llog_logid_rec   *lir = (struct llog_logid_rec *)rec;
+       int                      rc;
 
 
-        ENTRY;
-        if (rec->lrh_type != LLOG_LOGID_MAGIC)
-              RETURN (-EINVAL);
+       ENTRY;
+       if (rec->lrh_type != LLOG_LOGID_MAGIC)
+               RETURN(-EINVAL);
        rc = llog_remove_log(env, handle, &lir->lid_id);
 
        rc = llog_remove_log(env, handle, &lir->lid_id);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 
 }
 
 
@@ -291,12 +291,15 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
                 err = str2logid(&logid, data->ioc_inlbuf1, data->ioc_inllen1);
                 if (err)
                         GOTO(out, err);
                 err = str2logid(&logid, data->ioc_inlbuf1, data->ioc_inllen1);
                 if (err)
                         GOTO(out, err);
-               err = llog_create(NULL, ctxt, &handle, &logid, NULL);
+               err = llog_open(NULL, ctxt, &handle, &logid, NULL,
+                               LLOG_OPEN_EXISTS);
                 if (err)
                         GOTO(out, err);
         } else if (*data->ioc_inlbuf1 == '$') {
                 char *name = data->ioc_inlbuf1 + 1;
                 if (err)
                         GOTO(out, err);
         } else if (*data->ioc_inlbuf1 == '$') {
                 char *name = data->ioc_inlbuf1 + 1;
-               err = llog_create(NULL, ctxt, &handle, NULL, name);
+
+               err = llog_open(NULL, ctxt, &handle, NULL, name,
+                               LLOG_OPEN_EXISTS);
                 if (err)
                         GOTO(out, err);
         } else {
                 if (err)
                         GOTO(out, err);
         } else {
@@ -345,9 +348,9 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
                 LASSERT(data->ioc_inllen1);
                err = llog_process(NULL, handle, class_config_dump_handler,
                                   data, NULL);
                 LASSERT(data->ioc_inllen1);
                err = llog_process(NULL, handle, class_config_dump_handler,
                                   data, NULL);
-                if (err == -LLOG_EEMPTY)
-                        err = 0;
-                else
+               if (err == -LLOG_EEMPTY)
+                       err = 0;
+               else
                        err = llog_process(NULL, handle, llog_print_cb, data,
                                           NULL);
 
                        err = llog_process(NULL, handle, llog_print_cb, data,
                                           NULL);
 
@@ -385,13 +388,10 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
 
                 if (handle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN) {
                         err = llog_destroy(NULL, handle);
 
                 if (handle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN) {
                         err = llog_destroy(NULL, handle);
-                        if (!err)
-                                llog_free_handle(handle);
-                        GOTO(out, err);
-                }
-
-                if (!(handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
-                        GOTO(out_close, err = -EINVAL);
+                       GOTO(out_close, err);
+               } else if (!(handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)) {
+                       GOTO(out_close, err = -EINVAL);
+               }
 
                 if (data->ioc_inlbuf2) {
                         /*remove indicate log from the catalog*/
 
                 if (data->ioc_inlbuf2) {
                         /*remove indicate log from the catalog*/
@@ -402,16 +402,21 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
                        err = llog_remove_log(NULL, handle, &plain);
                } else {
                        /* remove all the log of the catalog */
                        err = llog_remove_log(NULL, handle, &plain);
                } else {
                        /* remove all the log of the catalog */
-                       llog_process(NULL, handle, llog_delete_cb, NULL, NULL);
-                }
-                GOTO(out_close, err);
-        }
+                       err = llog_process(NULL, handle, llog_delete_cb, NULL,
+                                          NULL);
+                       if (err)
+                               GOTO(out_close, err);
+               }
+               break;
+       }
+       default:
+               GOTO(out_close, err = -ENOTTY);
         }
 
 out_close:
        if (handle->lgh_hdr &&
            handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
         }
 
 out_close:
        if (handle->lgh_hdr &&
            handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
-               llog_cat_put(NULL, handle);
+               llog_cat_close(NULL, handle);
        else
                llog_close(NULL, handle);
 out:
        else
                llog_close(NULL, handle);
 out:
index a01582b..763649b 100644 (file)
@@ -570,7 +570,7 @@ static struct file *llog_filp_open(char *dir, char *name, int flags, int mode)
                 filp = ERR_PTR(-ENAMETOOLONG);
         } else {
                 filp = l_filp_open(logname, flags, mode);
                 filp = ERR_PTR(-ENAMETOOLONG);
         } else {
                 filp = l_filp_open(logname, flags, mode);
-                if (IS_ERR(filp))
+               if (IS_ERR(filp) && PTR_ERR(filp) != -ENOENT)
                         CERROR("logfile creation %s: %ld\n", logname,
                                PTR_ERR(filp));
         }
                         CERROR("logfile creation %s: %ld\n", logname,
                                PTR_ERR(filp));
         }
@@ -578,124 +578,188 @@ static struct file *llog_filp_open(char *dir, char *name, int flags, int mode)
         return filp;
 }
 
         return filp;
 }
 
-/* This is a callback from the llog_* functions.
- * Assumes caller has already pushed us into the kernel context. */
-static int llog_lvfs_create(const struct lu_env *env,
-                           struct llog_ctxt *ctxt, struct llog_handle **res,
-                           struct llog_logid *logid, char *name)
+static int llog_lvfs_open(const struct lu_env *env,  struct llog_handle *handle,
+                         struct llog_logid *logid, char *name,
+                         enum llog_open_param open_param)
 {
 {
-        struct llog_handle *handle;
-        struct obd_device *obd;
-        struct l_dentry *dchild = NULL;
-        struct obdo *oa = NULL;
-        int rc = 0;
-        int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
-        ENTRY;
-
-        handle = llog_alloc_handle();
-        if (handle == NULL)
-                RETURN(-ENOMEM);
-        *res = handle;
-
-        LASSERT(ctxt);
-        LASSERT(ctxt->loc_exp);
-        obd = ctxt->loc_exp->exp_obd;
-
-        if (logid != NULL) {
-                dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, logid->lgl_oid,
-                                             logid->lgl_ogen, logid->lgl_oseq);
-
-                if (IS_ERR(dchild)) {
-                        rc = PTR_ERR(dchild);
-                        CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
-                               logid->lgl_oid, logid->lgl_ogen, rc);
-                        GOTO(out, rc);
-                }
-
-                if (dchild->d_inode == NULL) {
-                        l_dput(dchild);
-                        rc = -ENOENT;
-                        CERROR("nonexistent log file "LPX64":"LPX64": rc %d\n",
-                               logid->lgl_oid, logid->lgl_oseq, rc);
-                        GOTO(out, rc);
-                }
-
-                /* l_dentry_open will call dput(dchild) if there is an error */
-                handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
-                                                    O_RDWR | O_LARGEFILE);
-                if (IS_ERR(handle->lgh_file)) {
-                        rc = PTR_ERR(handle->lgh_file);
-                        CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
-                               logid->lgl_oid, logid->lgl_ogen, rc);
-                        GOTO(out, rc);
-                }
-
-                /* assign the value of lgh_id for handle directly */
-                handle->lgh_id = *logid;
-
-        } else if (name) {
-                handle->lgh_file = llog_filp_open(MOUNT_CONFIGS_DIR,
-                                                  name, open_flags, 0644);
-                if (IS_ERR(handle->lgh_file))
-                        GOTO(out, rc = PTR_ERR(handle->lgh_file));
-
-                handle->lgh_id.lgl_oseq = 1;
-                handle->lgh_id.lgl_oid =
-                        handle->lgh_file->f_dentry->d_inode->i_ino;
-                handle->lgh_id.lgl_ogen =
-                        handle->lgh_file->f_dentry->d_inode->i_generation;
-        } else {
-                OBDO_ALLOC(oa);
-                if (oa == NULL)
-                        GOTO(out, rc = -ENOMEM);
+       struct llog_ctxt        *ctxt = handle->lgh_ctxt;
+       struct l_dentry         *dchild = NULL;
+       struct obd_device       *obd;
+       int                      rc = 0;
+
+       ENTRY;
+
+       LASSERT(ctxt);
+       LASSERT(ctxt->loc_exp);
+       LASSERT(ctxt->loc_exp->exp_obd);
+       obd = ctxt->loc_exp->exp_obd;
+
+       LASSERT(handle);
+       if (logid != NULL) {
+               dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, logid->lgl_oid,
+                                            logid->lgl_ogen, logid->lgl_oseq);
+               if (IS_ERR(dchild)) {
+                       rc = PTR_ERR(dchild);
+                       CERROR("%s: error looking up logfile #"LPX64"#"
+                              LPX64"#%08x: rc = %d\n",
+                              ctxt->loc_obd->obd_name, logid->lgl_oid,
+                              logid->lgl_oseq, logid->lgl_ogen, rc);
+                       GOTO(out, rc);
+               }
+               if (dchild->d_inode == NULL) {
+                       l_dput(dchild);
+                       rc = -ENOENT;
+                       CERROR("%s: nonexistent llog #"LPX64"#"LPX64"#%08x: "
+                              "rc = %d\n", ctxt->loc_obd->obd_name,
+                              logid->lgl_oid, logid->lgl_oseq,
+                              logid->lgl_ogen, rc);
+                       GOTO(out, rc);
+               }
+               /* l_dentry_open will call dput(dchild) if there is an error */
+               handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
+                                                O_RDWR | O_LARGEFILE);
+               if (IS_ERR(handle->lgh_file)) {
+                       rc = PTR_ERR(handle->lgh_file);
+                       handle->lgh_file = NULL;
+                       CERROR("%s: error opening llog #"LPX64"#"LPX64"#%08x: "
+                              "rc = %d\n", ctxt->loc_obd->obd_name,
+                              logid->lgl_oid, logid->lgl_oseq,
+                              logid->lgl_ogen, rc);
+                       GOTO(out, rc);
+               }
 
 
-                oa->o_seq = FID_SEQ_LLOG;
-                oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
+               handle->lgh_id = *logid;
+       } else if (name) {
+               handle->lgh_file = llog_filp_open(MOUNT_CONFIGS_DIR, name,
+                                                 O_RDWR | O_LARGEFILE, 0644);
+               if (IS_ERR(handle->lgh_file)) {
+                       rc = PTR_ERR(handle->lgh_file);
+                       handle->lgh_file = NULL;
+                       if (rc == -ENOENT && open_param == LLOG_OPEN_NEW) {
+                               OBD_ALLOC(handle->lgh_name, strlen(name) + 1);
+                               if (handle->lgh_name)
+                                       strcpy(handle->lgh_name, name);
+                               else
+                                       GOTO(out, rc = -ENOMEM);
+                               rc = 0;
+                       } else {
+                               GOTO(out, rc);
+                       }
+               } else {
+                       handle->lgh_id.lgl_oseq = FID_SEQ_LLOG;
+                       handle->lgh_id.lgl_oid =
+                               handle->lgh_file->f_dentry->d_inode->i_ino;
+                       handle->lgh_id.lgl_ogen =
+                               handle->lgh_file->f_dentry->d_inode->i_generation;
+               }
+       } else {
+               LASSERTF(open_param == LLOG_OPEN_NEW, "%#x\n", open_param);
+               handle->lgh_file = NULL;
+       }
+
+       /* No new llog is expected but doesn't exist */
+       if (open_param != LLOG_OPEN_NEW && handle->lgh_file == NULL)
+               GOTO(out_name, rc = -ENOENT);
+
+       RETURN(0);
+out_name:
+       if (handle->lgh_name != NULL)
+               OBD_FREE(handle->lgh_name, strlen(name) + 1);
+out:
+       RETURN(rc);
+}
 
 
-                rc = obd_create(NULL, ctxt->loc_exp, oa, NULL, NULL);
-                if (rc)
-                        GOTO(out, rc);
+static int llog_lvfs_exist(struct llog_handle *handle)
+{
+       return (handle->lgh_file != NULL);
+}
 
 
-                /* FIXME: rationalize the misuse of o_generation in
-                 *        this API along with mds_obd_{create,destroy}.
-                 *        Hopefully it is only an internal API issue. */
+/* This is a callback from the llog_* functions.
+ * Assumes caller has already pushed us into the kernel context. */
+static int llog_lvfs_create(const struct lu_env *env,
+                           struct llog_handle *handle,
+                           struct thandle *th)
+{
+       struct llog_ctxt        *ctxt = handle->lgh_ctxt;
+       struct obd_device       *obd;
+       struct l_dentry         *dchild = NULL;
+       struct obdo             *oa = NULL;
+       int                      rc = 0;
+       int                      open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
+
+       ENTRY;
+
+       LASSERT(ctxt);
+       LASSERT(ctxt->loc_exp);
+       obd = ctxt->loc_exp->exp_obd;
+       LASSERT(handle->lgh_file == NULL);
+
+       if (handle->lgh_name) {
+               handle->lgh_file = llog_filp_open(MOUNT_CONFIGS_DIR,
+                                                 handle->lgh_name,
+                                                 open_flags, 0644);
+               if (IS_ERR(handle->lgh_file))
+                       RETURN(PTR_ERR(handle->lgh_file));
+
+               handle->lgh_id.lgl_oseq = FID_SEQ_LLOG;
+               handle->lgh_id.lgl_oid =
+                       handle->lgh_file->f_dentry->d_inode->i_ino;
+               handle->lgh_id.lgl_ogen =
+                       handle->lgh_file->f_dentry->d_inode->i_generation;
+       } else {
+               OBDO_ALLOC(oa);
+               if (oa == NULL)
+                       RETURN(-ENOMEM);
+
+               oa->o_seq = FID_SEQ_LLOG;
+               oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
+
+               rc = obd_create(NULL, ctxt->loc_exp, oa, NULL, NULL);
+               if (rc)
+                       GOTO(out, rc);
+
+               /* FIXME: rationalize the misuse of o_generation in
+                *        this API along with mds_obd_{create,destroy}.
+                *        Hopefully it is only an internal API issue. */
 #define o_generation o_parent_oid
 #define o_generation o_parent_oid
-                dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
-                                             oa->o_generation, oa->o_seq);
-
-                if (IS_ERR(dchild))
-                        GOTO(out, rc = PTR_ERR(dchild));
-
-                handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
-                                                 open_flags);
-                if (IS_ERR(handle->lgh_file))
-                        GOTO(out, rc = PTR_ERR(handle->lgh_file));
-
-                handle->lgh_id.lgl_oseq = oa->o_seq;
-                handle->lgh_id.lgl_oid = oa->o_id;
-                handle->lgh_id.lgl_ogen = oa->o_generation;
-        }
-
-        handle->lgh_ctxt = ctxt;
+               dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
+                                            oa->o_generation, oa->o_seq);
+               if (IS_ERR(dchild))
+                       GOTO(out, rc = PTR_ERR(dchild));
+
+               handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
+                                                open_flags);
+               if (IS_ERR(handle->lgh_file))
+                       GOTO(out, rc = PTR_ERR(handle->lgh_file));
+
+               handle->lgh_id.lgl_oseq = oa->o_seq;
+               handle->lgh_id.lgl_oid = oa->o_id;
+               handle->lgh_id.lgl_ogen = oa->o_generation;
 out:
 out:
-        if (rc)
-                llog_free_handle(handle);
-
-        if (oa)
-                OBDO_FREE(oa);
-        RETURN(rc);
+               OBDO_FREE(oa);
+       }
+       RETURN(rc);
 }
 
 static int llog_lvfs_close(const struct lu_env *env,
                           struct llog_handle *handle)
 {
 }
 
 static int llog_lvfs_close(const struct lu_env *env,
                           struct llog_handle *handle)
 {
-        int rc;
-        ENTRY;
-
-        rc = filp_close(handle->lgh_file, 0);
-        if (rc)
-                CERROR("error closing log: rc %d\n", rc);
-        RETURN(rc);
+       int rc;
+
+       ENTRY;
+
+       if (handle->lgh_file == NULL)
+               RETURN(0);
+       rc = filp_close(handle->lgh_file, 0);
+       if (rc)
+               CERROR("%s: error closing llog #"LPX64"#"LPX64"#%08x: "
+                      "rc = %d\n", handle->lgh_ctxt->loc_obd->obd_name,
+                      handle->lgh_id.lgl_oid, handle->lgh_id.lgl_oseq,
+                      handle->lgh_id.lgl_ogen, rc);
+       handle->lgh_file = NULL;
+       if (handle->lgh_name)
+               OBD_FREE(handle->lgh_name, strlen(handle->lgh_name) + 1);
+       RETURN(rc);
 }
 
 static int llog_lvfs_destroy(const struct lu_env *env,
 }
 
 static int llog_lvfs_destroy(const struct lu_env *env,
@@ -712,6 +776,7 @@ static int llog_lvfs_destroy(const struct lu_env *env,
 
         dir = MOUNT_CONFIGS_DIR;
 
 
         dir = MOUNT_CONFIGS_DIR;
 
+       LASSERT(handle->lgh_file);
         fdentry = handle->lgh_file->f_dentry;
         inode = fdentry->d_parent->d_inode;
         if (strcmp(fdentry->d_parent->d_name.name, dir) == 0) {
         fdentry = handle->lgh_file->f_dentry;
         inode = fdentry->d_parent->d_inode;
         if (strcmp(fdentry->d_parent->d_name.name, dir) == 0) {
@@ -866,98 +931,33 @@ out1:
 EXPORT_SYMBOL(llog_put_cat_list);
 
 struct llog_operations llog_lvfs_ops = {
 EXPORT_SYMBOL(llog_put_cat_list);
 
 struct llog_operations llog_lvfs_ops = {
-        lop_write_rec:   llog_lvfs_write_rec,
-        lop_next_block:  llog_lvfs_next_block,
-        lop_prev_block:  llog_lvfs_prev_block,
-        lop_read_header: llog_lvfs_read_header,
-        lop_create:      llog_lvfs_create,
-        lop_destroy:     llog_lvfs_destroy,
-        lop_close:       llog_lvfs_close,
-        //        lop_cancel: llog_lvfs_cancel,
+       .lop_write_rec          = llog_lvfs_write_rec,
+       .lop_next_block         = llog_lvfs_next_block,
+       .lop_prev_block         = llog_lvfs_prev_block,
+       .lop_read_header        = llog_lvfs_read_header,
+       .lop_create             = llog_lvfs_create,
+       .lop_destroy            = llog_lvfs_destroy,
+       .lop_close              = llog_lvfs_close,
+       .lop_open               = llog_lvfs_open,
+       .lop_exist              = llog_lvfs_exist,
 };
 };
-
 EXPORT_SYMBOL(llog_lvfs_ops);
 EXPORT_SYMBOL(llog_lvfs_ops);
-
 #else /* !__KERNEL__ */
 #else /* !__KERNEL__ */
-
-static int llog_lvfs_read_header(const struct lu_env *env,
-                                struct llog_handle *handle)
-{
-        LBUG();
-        return 0;
-}
-
-static int llog_lvfs_write_rec(const struct lu_env *env,
-                              struct llog_handle *loghandle,
-                              struct llog_rec_hdr *rec,
-                              struct llog_cookie *reccookie, int cookiecount,
-                              void *buf, int idx)
-{
-        LBUG();
-        return 0;
-}
-
-static int llog_lvfs_next_block(const struct lu_env *env,
-                               struct llog_handle *loghandle, int *cur_idx,
-                               int next_idx, __u64 *cur_offset, void *buf,
-                               int len)
-{
-        LBUG();
-        return 0;
-}
-
-static int llog_lvfs_prev_block(const struct lu_env *env,
-                               struct llog_handle *loghandle,
-                               int prev_idx, void *buf, int len)
-{
-        LBUG();
-        return 0;
-}
-
-static int llog_lvfs_create(const struct lu_env *env,
-                           struct llog_ctxt *ctxt, struct llog_handle **res,
-                           struct llog_logid *logid, char *name)
-{
-        LBUG();
-        return 0;
-}
-
-static int llog_lvfs_close(const struct lu_env *env,
-                          struct llog_handle *handle)
-{
-        LBUG();
-        return 0;
-}
-
-static int llog_lvfs_destroy(const struct lu_env *env,
-                            struct llog_handle *handle)
-{
-        LBUG();
-        return 0;
-}
-
 int llog_get_cat_list(struct obd_device *disk_obd,
 int llog_get_cat_list(struct obd_device *disk_obd,
-                      char *name, int idx, int count, struct llog_catid *idarray)
+                     char *name, int idx, int count,
+                     struct llog_catid *idarray)
 {
 {
-        LBUG();
-        return 0;
+       LBUG();
+       return 0;
 }
 
 int llog_put_cat_list(struct obd_device *disk_obd,
 }
 
 int llog_put_cat_list(struct obd_device *disk_obd,
-                      char *name, int idx, int count, struct llog_catid *idarray)
+                     char *name, int idx, int count,
+                     struct llog_catid *idarray)
 {
 {
-        LBUG();
-        return 0;
+       LBUG();
+       return 0;
 }
 
 }
 
-struct llog_operations llog_lvfs_ops = {
-        lop_write_rec:   llog_lvfs_write_rec,
-        lop_next_block:  llog_lvfs_next_block,
-        lop_prev_block:  llog_lvfs_prev_block,
-        lop_read_header: llog_lvfs_read_header,
-        lop_create:      llog_lvfs_create,
-        lop_destroy:     llog_lvfs_destroy,
-        lop_close:       llog_lvfs_close,
-//        lop_cancel:      llog_lvfs_cancel,
-};
+struct llog_operations llog_lvfs_ops = {};
 #endif
 #endif
index 372536f..b84e9dc 100644 (file)
@@ -316,9 +316,10 @@ int llog_obd_origin_setup(const struct lu_env *env, struct obd_device *obd,
                 RETURN(-ENODEV);
 
         if (logid && logid->lgl_oid) {
                 RETURN(-ENODEV);
 
         if (logid && logid->lgl_oid) {
-               rc = llog_create(env, ctxt, &handle, logid, NULL);
-        } else {
-               rc = llog_create(env, ctxt, &handle, NULL, (char *)name);
+               rc = llog_open(env, ctxt, &handle, logid, NULL,
+                              LLOG_OPEN_EXISTS);
+       } else {
+               rc = llog_open_create(env, ctxt, &handle, NULL, (char *)name);
                 if (!rc && logid)
                         *logid = handle->lgh_id;
         }
                 if (!rc && logid)
                         *logid = handle->lgh_id;
         }
@@ -344,42 +345,13 @@ EXPORT_SYMBOL(llog_obd_origin_setup);
 
 int llog_obd_origin_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt)
 {
 
 int llog_obd_origin_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt)
 {
-        struct llog_handle *cathandle, *n, *loghandle;
-        struct llog_log_hdr *llh;
-        int rc, index;
-        ENTRY;
+       ENTRY;
 
 
-        if (!ctxt)
-                RETURN(0);
+       if (!ctxt)
+               RETURN(0);
 
 
-        cathandle = ctxt->loc_handle;
-        if (cathandle) {
-                cfs_list_for_each_entry_safe(loghandle, n,
-                                             &cathandle->u.chd.chd_head,
-                                             u.phd.phd_entry) {
-                        llh = loghandle->lgh_hdr;
-                        if ((llh->llh_flags &
-                                LLOG_F_ZAP_WHEN_EMPTY) &&
-                            (llh->llh_count == 1)) {
-                               rc = llog_destroy(env, loghandle);
-                                if (rc)
-                                        CERROR("failure destroying log during "
-                                               "cleanup: %d\n", rc);
-
-                                index = loghandle->u.phd.phd_cookie.lgc_index;
-                                llog_free_handle(loghandle);
-
-                                LASSERT(index);
-                                llog_cat_set_first_idx(cathandle, index);
-                               rc = llog_cancel_rec(env, cathandle, index);
-                                if (rc == 0)
-                                        CDEBUG(D_RPCTRACE, "cancel plain log at"
-                                               "index %u of catalog "LPX64"\n",
-                                               index,cathandle->lgh_id.lgl_oid);
-                        }
-                }
-               llog_cat_put(env, ctxt->loc_handle);
-        }
+       if (ctxt->loc_handle)
+               llog_cat_close(env, ctxt->loc_handle);
         RETURN(0);
 }
 EXPORT_SYMBOL(llog_obd_origin_cleanup);
         RETURN(0);
 }
 EXPORT_SYMBOL(llog_obd_origin_cleanup);
index 7027a4f..037e0a7 100644 (file)
@@ -99,7 +99,7 @@ static int llog_test_1(struct obd_device *obd, char *name)
         CWARN("1a: create a log with name: %s\n", name);
         LASSERT(ctxt);
 
         CWARN("1a: create a log with name: %s\n", name);
         LASSERT(ctxt);
 
-       rc = llog_create(NULL, ctxt, &llh, NULL, name);
+       rc = llog_open_create(NULL, ctxt, &llh, NULL, name);
         if (rc) {
                 CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
                 llog_ctxt_put(ctxt);
         if (rc) {
                 CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
                 llog_ctxt_put(ctxt);
@@ -131,7 +131,7 @@ static int llog_test_2(struct obd_device *obd, char *name,
         ENTRY;
 
         CWARN("2a: re-open a log with name: %s\n", name);
         ENTRY;
 
         CWARN("2a: re-open a log with name: %s\n", name);
-       rc = llog_create(NULL, ctxt, llh, NULL, name);
+       rc = llog_open(NULL, ctxt, llh, NULL, name, LLOG_OPEN_EXISTS);
         if (rc) {
                 CERROR("2a: re-open log with name %s failed: %d\n", name, rc);
                 GOTO(out, rc);
         if (rc) {
                 CERROR("2a: re-open log with name %s failed: %d\n", name, rc);
                 GOTO(out, rc);
@@ -293,7 +293,7 @@ static int llog_test_4(struct obd_device *obd)
 
         sprintf(name, "%x", llog_test_rand+1);
         CWARN("4a: create a catalog log with name: %s\n", name);
 
         sprintf(name, "%x", llog_test_rand+1);
         CWARN("4a: create a catalog log with name: %s\n", name);
-       rc = llog_create(NULL, ctxt, &cath, NULL, name);
+       rc = llog_open_create(NULL, ctxt, &cath, NULL, name);
         if (rc) {
                 CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
                 GOTO(out, rc);
         if (rc) {
                 CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
                 GOTO(out, rc);
@@ -359,7 +359,7 @@ static int llog_test_4(struct obd_device *obd)
 
  out:
         CWARN("4f: put newly-created catalog\n");
 
  out:
         CWARN("4f: put newly-created catalog\n");
-       rc = llog_cat_put(NULL, cath);
+       rc = llog_cat_close(NULL, cath);
 ctxt_release:
         llog_ctxt_put(ctxt);
         if (rc)
 ctxt_release:
         llog_ctxt_put(ctxt);
         if (rc)
@@ -433,7 +433,7 @@ static int llog_test_5(struct obd_device *obd)
         lmr.lmr_hdr.lrh_type = 0xf00f00;
 
         CWARN("5a: re-open catalog by id\n");
         lmr.lmr_hdr.lrh_type = 0xf00f00;
 
         CWARN("5a: re-open catalog by id\n");
-       rc = llog_create(NULL, ctxt, &llh, &cat_logid, NULL);
+       rc = llog_open(NULL, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
         if (rc) {
                 CERROR("5a: llog_create with logid failed: %d\n", rc);
                 GOTO(out, rc);
         if (rc) {
                 CERROR("5a: llog_create with logid failed: %d\n", rc);
                 GOTO(out, rc);
@@ -486,7 +486,7 @@ static int llog_test_5(struct obd_device *obd)
  out:
         CWARN("5: close re-opened catalog\n");
         if (llh)
  out:
         CWARN("5: close re-opened catalog\n");
         if (llh)
-               rc = llog_cat_put(NULL, llh);
+               rc = llog_cat_close(NULL, llh);
         if (rc)
                 CERROR("1b: close log %s failed: %d\n", name, rc);
         llog_ctxt_put(ctxt);
         if (rc)
                 CERROR("1b: close log %s failed: %d\n", name, rc);
         llog_ctxt_put(ctxt);
@@ -525,7 +525,7 @@ static int llog_test_6(struct obd_device *obd, char *name)
         }
 
         nctxt = llog_get_context(mgc_obd, LLOG_CONFIG_REPL_CTXT);
         }
 
         nctxt = llog_get_context(mgc_obd, LLOG_CONFIG_REPL_CTXT);
-       rc = llog_create(NULL, nctxt, &llh, NULL, name);
+       rc = llog_open(NULL, nctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
         if (rc) {
                 CERROR("6: llog_create failed %d\n", rc);
                 llog_ctxt_put(nctxt);
         if (rc) {
                 CERROR("6: llog_create failed %d\n", rc);
                 llog_ctxt_put(nctxt);
@@ -570,7 +570,7 @@ static int llog_test_7(struct obd_device *obd)
         CWARN("7: create a log with name: %s\n", name);
         LASSERT(ctxt);
 
         CWARN("7: create a log with name: %s\n", name);
         LASSERT(ctxt);
 
-       rc = llog_create(NULL, ctxt, &llh, NULL, name);
+       rc = llog_open_create(NULL, ctxt, &llh, NULL, name);
         if (rc) {
                 CERROR("7: llog_create with name %s failed: %d\n", name, rc);
                 GOTO(ctxt_release, rc);
         if (rc) {
                 CERROR("7: llog_create with name %s failed: %d\n", name, rc);
                 GOTO(ctxt_release, rc);
@@ -588,8 +588,7 @@ static int llog_test_7(struct obd_device *obd)
        rc = llog_destroy(NULL, llh);
         if (rc)
                 CERROR("7: llog_destroy failed: %d\n", rc);
        rc = llog_destroy(NULL, llh);
         if (rc)
                 CERROR("7: llog_destroy failed: %d\n", rc);
-        else
-                llog_free_handle(llh);
+       llog_close(NULL, llh);
 ctxt_release:
         llog_ctxt_put(ctxt);
         RETURN(rc);
 ctxt_release:
         llog_ctxt_put(ctxt);
         RETURN(rc);
index 9220ac7..54984f5 100644 (file)
@@ -1519,7 +1519,7 @@ int class_config_parse_llog(struct llog_ctxt *ctxt, char *name,
         ENTRY;
 
         CDEBUG(D_INFO, "looking up llog %s\n", name);
         ENTRY;
 
         CDEBUG(D_INFO, "looking up llog %s\n", name);
-       rc = llog_create(NULL, ctxt, &llh, NULL, name);
+       rc = llog_open(NULL, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
@@ -1619,13 +1619,13 @@ int class_config_dump_llog(struct llog_ctxt *ctxt, char *name,
 
         LCONSOLE_INFO("Dumping config log %s\n", name);
 
 
         LCONSOLE_INFO("Dumping config log %s\n", name);
 
-       rc = llog_create(NULL, ctxt, &llh, NULL, name);
-        if (rc)
-                RETURN(rc);
+       rc = llog_open(NULL, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
+       if (rc)
+               RETURN(rc);
 
        rc = llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
 
        rc = llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
-        if (rc)
-                GOTO(parse_out, rc);
+       if (rc)
+               GOTO(parse_out, rc);
 
        rc = llog_process(NULL, llh, class_config_dump_handler, cfg, NULL);
 parse_out:
 
        rc = llog_process(NULL, llh, class_config_dump_handler, cfg, NULL);
 parse_out:
index 6b07ef2..fff090e 100644 (file)
 
 /* This is a callback from the llog_* functions.
  * Assumes caller has already pushed us into the kernel context. */
 
 /* This is a callback from the llog_* functions.
  * Assumes caller has already pushed us into the kernel context. */
-static int llog_client_create(const struct lu_env *env, struct llog_ctxt *ctxt,
-                             struct llog_handle **res,
-                             struct llog_logid *logid, char *name)
+static int llog_client_open(const struct lu_env *env,
+                           struct llog_handle *lgh, struct llog_logid *logid,
+                           char *name, enum llog_open_param open_param)
 {
         struct obd_import     *imp;
         struct llogd_body     *body;
 {
         struct obd_import     *imp;
         struct llogd_body     *body;
-        struct llog_handle    *handle;
+       struct llog_ctxt      *ctxt = lgh->lgh_ctxt;
         struct ptlrpc_request *req = NULL;
         int                    rc;
         ENTRY;
 
         LLOG_CLIENT_ENTRY(ctxt, imp);
 
         struct ptlrpc_request *req = NULL;
         int                    rc;
         ENTRY;
 
         LLOG_CLIENT_ENTRY(ctxt, imp);
 
-        handle = llog_alloc_handle();
-        if (handle == NULL)
-                RETURN(-ENOMEM);
-        *res = handle;
+       /* client cannot create llog */
+       LASSERTF(open_param != LLOG_OPEN_NEW, "%#x\n", open_param);
+       LASSERT(lgh);
 
 
-        req = ptlrpc_request_alloc(imp, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
-        if (req == NULL)
-                GOTO(err_free, rc = -ENOMEM);
+       req = ptlrpc_request_alloc(imp, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
+       if (req == NULL)
+               GOTO(out, rc = -ENOMEM);
 
         if (name)
                 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
 
         if (name)
                 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
@@ -111,7 +110,7 @@ static int llog_client_create(const struct lu_env *env, struct llog_ctxt *ctxt,
         if (rc) {
                 ptlrpc_request_free(req);
                 req = NULL;
         if (rc) {
                 ptlrpc_request_free(req);
                 req = NULL;
-                GOTO(err_free, rc);
+               GOTO(out, rc);
         }
         ptlrpc_request_set_replen(req);
 
         }
         ptlrpc_request_set_replen(req);
 
@@ -130,23 +129,19 @@ static int llog_client_create(const struct lu_env *env, struct llog_ctxt *ctxt,
 
         rc = ptlrpc_queue_wait(req);
         if (rc)
 
         rc = ptlrpc_queue_wait(req);
         if (rc)
-                GOTO(err_free, rc);
+               GOTO(out, rc);
 
 
-        body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
-        if (body == NULL)
-                GOTO(err_free, rc =-EFAULT);
+       body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
+       if (body == NULL)
+               GOTO(out, rc = -EFAULT);
 
 
-        handle->lgh_id = body->lgd_logid;
-        handle->lgh_ctxt = ctxt;
-        EXIT;
+       lgh->lgh_id = body->lgd_logid;
+       lgh->lgh_ctxt = ctxt;
+       EXIT;
 out:
 out:
-        LLOG_CLIENT_EXIT(ctxt, imp);
-        ptlrpc_req_finished(req);
-        return rc;
-err_free:
-        *res = NULL;
-        llog_free_handle(handle);
-        goto out;
+       LLOG_CLIENT_EXIT(ctxt, imp);
+       ptlrpc_req_finished(req);
+       return rc;
 }
 
 static int llog_client_destroy(const struct lu_env *env,
 }
 
 static int llog_client_destroy(const struct lu_env *env,
@@ -169,6 +164,10 @@ static int llog_client_destroy(const struct lu_env *env,
         body->lgd_logid = loghandle->lgh_id;
         body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
 
         body->lgd_logid = loghandle->lgh_id;
         body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
 
+       if (!(body->lgd_llh_flags & LLOG_F_IS_PLAIN))
+               CERROR("%s: wrong llog flags %x\n", imp->imp_obd->obd_name,
+                      body->lgd_llh_flags);
+
         ptlrpc_request_set_replen(req);
         rc = ptlrpc_queue_wait(req);
 
         ptlrpc_request_set_replen(req);
         rc = ptlrpc_queue_wait(req);
 
@@ -348,13 +347,12 @@ static int llog_client_close(const struct lu_env *env,
         return(0);
 }
 
         return(0);
 }
 
-
 struct llog_operations llog_client_ops = {
 struct llog_operations llog_client_ops = {
-        lop_next_block:  llog_client_next_block,
-        lop_prev_block:  llog_client_prev_block,
-        lop_read_header: llog_client_read_header,
-        lop_create:      llog_client_create,
-        lop_destroy:     llog_client_destroy,
-        lop_close:       llog_client_close,
+       .lop_next_block         = llog_client_next_block,
+       .lop_prev_block         = llog_client_prev_block,
+       .lop_read_header        = llog_client_read_header,
+       .lop_open               = llog_client_open,
+       .lop_destroy            = llog_client_destroy,
+       .lop_close              = llog_client_close,
 };
 EXPORT_SYMBOL(llog_client_ops);
 };
 EXPORT_SYMBOL(llog_client_ops);
index 7226a1e..a4ea3f1 100644 (file)
 #include <lustre_fsfilt.h>
 
 #if defined(__KERNEL__) && defined(LUSTRE_LOG_SERVER)
 #include <lustre_fsfilt.h>
 
 #if defined(__KERNEL__) && defined(LUSTRE_LOG_SERVER)
+static int llog_origin_close(const struct lu_env *env, struct llog_handle *lgh)
+{
+       if (lgh->lgh_hdr != NULL && lgh->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
+               return llog_cat_close(env, lgh);
+       else
+               return llog_close(env, lgh);
+}
 
 
-int llog_origin_handle_create(struct ptlrpc_request *req)
+/* Only open is supported, no new llog can be created remotely */
+int llog_origin_handle_open(struct ptlrpc_request *req)
 {
 {
-        struct obd_export    *exp = req->rq_export;
-        struct obd_device    *obd = exp->exp_obd;
-        struct obd_device    *disk_obd;
-        struct llog_handle   *loghandle;
-        struct llogd_body    *body;
-        struct lvfs_run_ctxt  saved;
-        struct llog_logid    *logid = NULL;
-        struct llog_ctxt     *ctxt;
-        char                 *name = NULL;
-        int                   rc, rc2;
-        ENTRY;
+       struct obd_export       *exp = req->rq_export;
+       struct obd_device       *obd = exp->exp_obd;
+       struct obd_device       *disk_obd;
+       struct lvfs_run_ctxt     saved;
+       struct llog_handle      *loghandle;
+       struct llogd_body       *body;
+       struct llog_logid       *logid = NULL;
+       struct llog_ctxt        *ctxt;
+       char                    *name = NULL;
+       int                      rc;
+
+       ENTRY;
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
@@ -90,8 +99,8 @@ int llog_origin_handle_create(struct ptlrpc_request *req)
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
-       rc = llog_create(req->rq_svc_thread->t_env, ctxt, &loghandle, logid,
-                        name);
+       rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle, logid,
+                      name, LLOG_OPEN_EXISTS);
         if (rc)
                 GOTO(out_pop, rc);
 
         if (rc)
                 GOTO(out_pop, rc);
 
@@ -104,28 +113,24 @@ int llog_origin_handle_create(struct ptlrpc_request *req)
 
        EXIT;
 out_close:
 
        EXIT;
 out_close:
-       rc2 = llog_close(req->rq_svc_thread->t_env, loghandle);
-        if (!rc)
-                rc = rc2;
+       llog_origin_close(req->rq_svc_thread->t_env, loghandle);
 out_pop:
 out_pop:
-        pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-        llog_ctxt_put(ctxt);
-        return rc;
+       pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+       llog_ctxt_put(ctxt);
+       return rc;
 }
 }
-EXPORT_SYMBOL(llog_origin_handle_create);
+EXPORT_SYMBOL(llog_origin_handle_open);
 
 int llog_origin_handle_destroy(struct ptlrpc_request *req)
 {
 
 int llog_origin_handle_destroy(struct ptlrpc_request *req)
 {
-        struct obd_export    *exp = req->rq_export;
-        struct obd_device    *obd = exp->exp_obd;
-        struct obd_device    *disk_obd;
-        struct llog_handle   *loghandle;
-        struct llogd_body    *body;
-        struct lvfs_run_ctxt  saved;
-        struct llog_logid    *logid = NULL;
-        struct llog_ctxt     *ctxt;
-        int                   rc;
-        ENTRY;
+       struct obd_device       *disk_obd;
+       struct lvfs_run_ctxt     saved;
+       struct llogd_body       *body;
+       struct llog_logid       *logid = NULL;
+       struct llog_ctxt        *ctxt;
+       int                      rc;
+
+       ENTRY;
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
@@ -134,47 +139,29 @@ int llog_origin_handle_destroy(struct ptlrpc_request *req)
         if (body->lgd_logid.lgl_oid > 0)
                 logid = &body->lgd_logid;
 
         if (body->lgd_logid.lgl_oid > 0)
                 logid = &body->lgd_logid;
 
-        ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
-        if (ctxt == NULL)
-                RETURN(-ENODEV);
+       if (!(body->lgd_llh_flags & LLOG_F_IS_PLAIN))
+               CERROR("%s: wrong llog flags %x\n",
+                      req->rq_export->exp_obd->obd_name, body->lgd_llh_flags);
 
 
-        disk_obd = ctxt->loc_exp->exp_obd;
-        push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+       ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
+       if (ctxt == NULL)
+               RETURN(-ENODEV);
 
 
-       rc = llog_create(req->rq_svc_thread->t_env, ctxt, &loghandle, logid,
-                        NULL);
-        if (rc)
-                GOTO(out_pop, rc);
+       disk_obd = ctxt->loc_exp->exp_obd;
+       push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
 
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                GOTO(out_close, rc = -ENOMEM);
-
-        body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
-        body->lgd_logid = loghandle->lgh_id;
-       rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle,
-                             LLOG_F_IS_PLAIN, NULL);
-        if (rc)
-                GOTO(out_close, rc);
-       rc = llog_destroy(req->rq_svc_thread->t_env, loghandle);
-        if (rc)
-                GOTO(out_close, rc);
-        llog_free_handle(loghandle);
-       EXIT;
-out_close:
-        if (rc)
-               llog_close(req->rq_svc_thread->t_env, loghandle);
-out_pop:
-        pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-        llog_ctxt_put(ctxt);
-        return rc;
+       rc = req_capsule_server_pack(&req->rq_pill);
+       /* erase only if no error and logid is valid */
+       if (rc == 0)
+               rc = llog_erase(req->rq_svc_thread->t_env, ctxt, logid, NULL);
+       pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+       llog_ctxt_put(ctxt);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(llog_origin_handle_destroy);
 
 int llog_origin_handle_next_block(struct ptlrpc_request *req)
 {
 }
 EXPORT_SYMBOL(llog_origin_handle_destroy);
 
 int llog_origin_handle_next_block(struct ptlrpc_request *req)
 {
-        struct obd_export   *exp = req->rq_export;
-        struct obd_device   *obd = exp->exp_obd;
         struct obd_device   *disk_obd;
         struct llog_handle  *loghandle;
         struct llogd_body   *body;
         struct obd_device   *disk_obd;
         struct llog_handle  *loghandle;
         struct llogd_body   *body;
@@ -182,43 +169,33 @@ int llog_origin_handle_next_block(struct ptlrpc_request *req)
         struct lvfs_run_ctxt saved;
         struct llog_ctxt    *ctxt;
         __u32                flags;
         struct lvfs_run_ctxt saved;
         struct llog_ctxt    *ctxt;
         __u32                flags;
-        __u8                *buf;
         void                *ptr;
         void                *ptr;
-        int                  rc, rc2;
+       int                  rc;
+
         ENTRY;
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
                 RETURN(-EFAULT);
 
         ENTRY;
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
                 RETURN(-EFAULT);
 
-        OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
-        if (!buf)
-                RETURN(-ENOMEM);
+       ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
+       if (ctxt == NULL)
+               RETURN(-ENODEV);
 
 
-        ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
-        if (ctxt == NULL)
-                GOTO(out_free, rc = -ENODEV);
-        disk_obd = ctxt->loc_exp->exp_obd;
-        push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+       disk_obd = ctxt->loc_exp->exp_obd;
+       push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
 
-       rc = llog_create(req->rq_svc_thread->t_env, ctxt, &loghandle,
-                        &body->lgd_logid, NULL);
-        if (rc)
-                GOTO(out_pop, rc);
+       rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
+                      &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
+       if (rc)
+               GOTO(out_pop, rc);
 
 
-        flags = body->lgd_llh_flags;
+       flags = body->lgd_llh_flags;
        rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
                              NULL);
         if (rc)
                 GOTO(out_close, rc);
 
        rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
                              NULL);
         if (rc)
                 GOTO(out_close, rc);
 
-        memset(buf, 0, LLOG_CHUNK_SIZE);
-       rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
-                            &body->lgd_saved_index, body->lgd_index,
-                            &body->lgd_cur_offset, buf, LLOG_CHUNK_SIZE);
-        if (rc)
-                GOTO(out_close, rc);
-
         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
                              LLOG_CHUNK_SIZE);
         rc = req_capsule_server_pack(&req->rq_pill);
         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
                              LLOG_CHUNK_SIZE);
         rc = req_capsule_server_pack(&req->rq_pill);
@@ -229,25 +206,23 @@ int llog_origin_handle_next_block(struct ptlrpc_request *req)
         *repbody = *body;
 
         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
         *repbody = *body;
 
         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
-        memcpy(ptr, buf, LLOG_CHUNK_SIZE);
+       rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
+                            &body->lgd_saved_index, body->lgd_index,
+                            &body->lgd_cur_offset, ptr, LLOG_CHUNK_SIZE);
+       if (rc)
+               GOTO(out_close, rc);
        EXIT;
 out_close:
        EXIT;
 out_close:
-       rc2 = llog_close(req->rq_svc_thread->t_env, loghandle);
-        if (!rc)
-                rc = rc2;
+       llog_origin_close(req->rq_svc_thread->t_env, loghandle);
 out_pop:
 out_pop:
-        pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-        llog_ctxt_put(ctxt);
-out_free:
-        OBD_FREE(buf, LLOG_CHUNK_SIZE);
-        return rc;
+       pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+       llog_ctxt_put(ctxt);
+       return rc;
 }
 EXPORT_SYMBOL(llog_origin_handle_next_block);
 
 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
 {
 }
 EXPORT_SYMBOL(llog_origin_handle_next_block);
 
 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
 {
-        struct obd_export    *exp = req->rq_export;
-        struct obd_device    *obd = exp->exp_obd;
         struct llog_handle   *loghandle;
         struct llogd_body    *body;
         struct llogd_body    *repbody;
         struct llog_handle   *loghandle;
         struct llogd_body    *body;
         struct llogd_body    *repbody;
@@ -255,43 +230,33 @@ int llog_origin_handle_prev_block(struct ptlrpc_request *req)
         struct lvfs_run_ctxt  saved;
         struct llog_ctxt     *ctxt;
         __u32                 flags;
         struct lvfs_run_ctxt  saved;
         struct llog_ctxt     *ctxt;
         __u32                 flags;
-        __u8                 *buf;
         void                 *ptr;
         void                 *ptr;
-        int                   rc, rc2;
+       int                   rc;
+
         ENTRY;
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
                 RETURN(-EFAULT);
 
         ENTRY;
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
                 RETURN(-EFAULT);
 
-        OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
-        if (!buf)
-                RETURN(-ENOMEM);
-
-        ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
-        if (ctxt == NULL)
-                GOTO(out_free, rc = -ENODEV);
+       ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
+       if (ctxt == NULL)
+               RETURN(-ENODEV);
 
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
 
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
-       rc = llog_create(req->rq_svc_thread->t_env, ctxt, &loghandle,
-                        &body->lgd_logid, NULL);
-        if (rc)
-                GOTO(out_pop, rc);
+       rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
+                        &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
+       if (rc)
+               GOTO(out_pop, rc);
 
 
-        flags = body->lgd_llh_flags;
+       flags = body->lgd_llh_flags;
        rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
                              NULL);
         if (rc)
                 GOTO(out_close, rc);
 
        rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
                              NULL);
         if (rc)
                 GOTO(out_close, rc);
 
-        memset(buf, 0, LLOG_CHUNK_SIZE);
-       rc = llog_prev_block(req->rq_svc_thread->t_env, loghandle,
-                            body->lgd_index, buf, LLOG_CHUNK_SIZE);
-        if (rc)
-                GOTO(out_close, rc);
-
         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
                              LLOG_CHUNK_SIZE);
         rc = req_capsule_server_pack(&req->rq_pill);
         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
                              LLOG_CHUNK_SIZE);
         rc = req_capsule_server_pack(&req->rq_pill);
@@ -302,26 +267,23 @@ int llog_origin_handle_prev_block(struct ptlrpc_request *req)
         *repbody = *body;
 
         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
         *repbody = *body;
 
         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
-        memcpy(ptr, buf, LLOG_CHUNK_SIZE);
+       rc = llog_prev_block(req->rq_svc_thread->t_env, loghandle,
+                            body->lgd_index, ptr, LLOG_CHUNK_SIZE);
+       if (rc)
+               GOTO(out_close, rc);
+
        EXIT;
 out_close:
        EXIT;
 out_close:
-       rc2 = llog_close(req->rq_svc_thread->t_env, loghandle);
-        if (!rc)
-                rc = rc2;
-
+       llog_origin_close(req->rq_svc_thread->t_env, loghandle);
 out_pop:
 out_pop:
-        pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-        llog_ctxt_put(ctxt);
-out_free:
-        OBD_FREE(buf, LLOG_CHUNK_SIZE);
-        return rc;
+       pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+       llog_ctxt_put(ctxt);
+       return rc;
 }
 EXPORT_SYMBOL(llog_origin_handle_prev_block);
 
 int llog_origin_handle_read_header(struct ptlrpc_request *req)
 {
 }
 EXPORT_SYMBOL(llog_origin_handle_prev_block);
 
 int llog_origin_handle_read_header(struct ptlrpc_request *req)
 {
-        struct obd_export    *exp = req->rq_export;
-        struct obd_device    *obd = exp->exp_obd;
         struct obd_device    *disk_obd;
         struct llog_handle   *loghandle;
         struct llogd_body    *body;
         struct obd_device    *disk_obd;
         struct llog_handle   *loghandle;
         struct llogd_body    *body;
@@ -329,33 +291,35 @@ int llog_origin_handle_read_header(struct ptlrpc_request *req)
         struct lvfs_run_ctxt  saved;
         struct llog_ctxt     *ctxt;
         __u32                 flags;
         struct lvfs_run_ctxt  saved;
         struct llog_ctxt     *ctxt;
         __u32                 flags;
-        int                   rc, rc2;
+       int                   rc;
+
         ENTRY;
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
                 RETURN(-EFAULT);
 
         ENTRY;
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
                 RETURN(-EFAULT);
 
-        ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
-        if (ctxt == NULL)
-                RETURN(-ENODEV);
+       ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
+       if (ctxt == NULL)
+               RETURN(-ENODEV);
 
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
 
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
-       rc = llog_create(req->rq_svc_thread->t_env, ctxt, &loghandle,
-                        &body->lgd_logid, NULL);
-        if (rc)
-                GOTO(out_pop, rc);
+       rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
+                      &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
+       if (rc)
+               GOTO(out_pop, rc);
 
 
-        /*
-         * llog_init_handle() reads the llog header
-         */
-        flags = body->lgd_llh_flags;
+       /*
+        * llog_init_handle() reads the llog header
+        */
+       flags = body->lgd_llh_flags;
        rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
                              NULL);
        rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
                              NULL);
-        if (rc)
-                GOTO(out_close, rc);
+       if (rc)
+               GOTO(out_close, rc);
+       flags = loghandle->lgh_hdr->llh_flags;
 
         rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
 
         rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
@@ -365,13 +329,11 @@ int llog_origin_handle_read_header(struct ptlrpc_request *req)
         *hdr = *loghandle->lgh_hdr;
         EXIT;
 out_close:
         *hdr = *loghandle->lgh_hdr;
         EXIT;
 out_close:
-       rc2 = llog_close(req->rq_svc_thread->t_env, loghandle);
-        if (!rc)
-                rc = rc2;
+       llog_origin_close(req->rq_svc_thread->t_env, loghandle);
 out_pop:
 out_pop:
-        pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-        llog_ctxt_put(ctxt);
-        return rc;
+       pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+       llog_ctxt_put(ctxt);
+       return rc;
 }
 EXPORT_SYMBOL(llog_origin_handle_read_header);
 
 }
 EXPORT_SYMBOL(llog_origin_handle_read_header);
 
@@ -385,7 +347,6 @@ EXPORT_SYMBOL(llog_origin_handle_close);
 
 int llog_origin_handle_cancel(struct ptlrpc_request *req)
 {
 
 int llog_origin_handle_cancel(struct ptlrpc_request *req)
 {
-        struct obd_device *obd = req->rq_export->exp_obd;
         int num_cookies, rc = 0, err, i, failed = 0;
         struct obd_device *disk_obd;
         struct llog_cookie *logcookies;
         int num_cookies, rc = 0, err, i, failed = 0;
         struct obd_device *disk_obd;
         struct llog_cookie *logcookies;
@@ -404,7 +365,8 @@ int llog_origin_handle_cancel(struct ptlrpc_request *req)
                 RETURN(-EFAULT);
         }
 
                 RETURN(-EFAULT);
         }
 
-        ctxt = llog_get_context(obd, logcookies->lgc_subsys);
+       ctxt = llog_get_context(req->rq_export->exp_obd,
+                               logcookies->lgc_subsys);
         if (ctxt == NULL)
                 RETURN(-ENODEV);
 
         if (ctxt == NULL)
                 RETURN(-ENODEV);
 
@@ -467,7 +429,7 @@ pop_ctxt:
 EXPORT_SYMBOL(llog_origin_handle_cancel);
 
 #else /* !__KERNEL__ */
 EXPORT_SYMBOL(llog_origin_handle_cancel);
 
 #else /* !__KERNEL__ */
-int llog_origin_handle_create(struct ptlrpc_request *req)
+int llog_origin_handle_open(struct ptlrpc_request *req)
 {
         LBUG();
         return 0;
 {
         LBUG();
         return 0;
index e0b8ebb..6c74de2 100644 (file)
@@ -1037,25 +1037,15 @@ int sptlrpc_target_local_copy_conf(struct obd_device *obd,
         }
 
         /* erase the old tmp log */
         }
 
         /* erase the old tmp log */
-       rc = llog_create(NULL, ctxt, &llh, NULL, LOG_SPTLRPC_TMP);
-       if (rc == 0) {
-               rc = llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
-               if (rc == 0) {
-                       rc = llog_destroy(NULL, llh);
-                       llog_free_handle(llh);
-               } else {
-                       llog_close(NULL, llh);
-               }
-        }
-
-        if (rc) {
-                CERROR("target %s: cannot erase temporary sptlrpc log: "
-                       "rc = %d\n", obd->obd_name, rc);
-                GOTO(out_dput, rc);
-        }
+       rc = llog_erase(NULL, ctxt, NULL, LOG_SPTLRPC_TMP);
+       if (rc < 0 && rc != -ENOENT) {
+               CERROR("%s: cannot erase temporary sptlrpc log: rc = %d\n",
+                      obd->obd_name, rc);
+               GOTO(out_dput, rc);
+       }
 
 
-        /* write temporary log */
-       rc = llog_create(NULL, ctxt, &llh, NULL, LOG_SPTLRPC_TMP);
+       /* write temporary log */
+       rc = llog_open_create(NULL, ctxt, &llh, NULL, LOG_SPTLRPC_TMP);
        if (rc)
                GOTO(out_dput, rc);
        rc = llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
        if (rc)
                GOTO(out_dput, rc);
        rc = llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
@@ -1066,12 +1056,9 @@ int sptlrpc_target_local_copy_conf(struct obd_device *obd,
 
 out_close:
        llog_close(NULL, llh);
 
 out_close:
        llog_close(NULL, llh);
-
-        if (rc == 0) {
-                rc = lustre_rename(dentry, obd->obd_lvfs_ctxt.pwdmnt,
-                                   LOG_SPTLRPC_TMP, LOG_SPTLRPC);
-        }
-
+       if (rc == 0)
+               rc = lustre_rename(dentry, obd->obd_lvfs_ctxt.pwdmnt,
+                                  LOG_SPTLRPC_TMP, LOG_SPTLRPC);
 out_dput:
         l_dput(dentry);
 out_ctx:
 out_dput:
         l_dput(dentry);
 out_ctx:
@@ -1133,9 +1120,12 @@ int sptlrpc_target_local_read_conf(struct obd_device *obd,
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
-       rc = llog_create(NULL, ctxt, &llh, NULL, LOG_SPTLRPC);
-       if (rc)
+       rc = llog_open(NULL, ctxt, &llh, NULL, LOG_SPTLRPC, LLOG_OPEN_EXISTS);
+       if (rc < 0) {
+               if (rc == -ENOENT)
+                       rc = 0;
                GOTO(out_pop, rc);
                GOTO(out_pop, rc);
+       }
 
        rc = llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
         if (rc)
 
        rc = llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
         if (rc)