Whamcloud - gitweb
- steal from the reserved bits instead of the bitmap bits, another
authorphil <phil>
Mon, 8 Sep 2003 07:51:02 +0000 (07:51 +0000)
committerphil <phil>
Mon, 8 Sep 2003 07:51:02 +0000 (07:51 +0000)
  piece of sage Andreas advice
- add an llh_flags field to the header
- rename confusing lgh_index to lgh_last_idx
- if the log is marked ZAP_WHEN_EMPTY, the log is full, and all of the
  records are cancelled, call llog_destroy from llog_cancel_rec
- extend llog_open to also support reopen-by-name
- add llog_vfs_read_blob and llog_vfs_read_header
- removed a bunch of unnecessary EXPORT_SYMBOLs
- add llog_lvfs_open()

lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_lvfs.c

index 356fd52..fd8d939 100644 (file)
@@ -85,14 +85,28 @@ int llog_cancel_rec(struct llog_handle *loghandle, int index)
         if (!ext2_clear_bit(index, llh->llh_bitmap)) {
                 CERROR("catalog index %u already clear?\n", index);
                 LBUG();
-        } else {
-                rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
-                if (rc) 
-                        CERROR("failure re-writing header %d\n", rc);
+        }
+
+        llh->llh_count--;
+
+        if (llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY &&
+            llh->llh_count == 1 &&
+            loghandle->lgh_last_idx == LLOG_BITMAP_BYTES * 8) {
+                rc = llog_destroy(loghandle);
+                if (rc)
+                        CERROR("failure destroying log after last cancel: %d\n",
+                               rc);
                 LASSERT(rc == 0);
+                RETURN(rc);
         }
+
+        rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
+        if (rc) 
+                CERROR("failure re-writing header %d\n", rc);
+        LASSERT(rc == 0);
         RETURN(rc);
 }
+EXPORT_SYMBOL(llog_cancel_rec);
 
 #if 0
 int filter_log_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
index 073dea0..9708180 100644 (file)
@@ -106,9 +106,8 @@ EXPORT_SYMBOL(llog_cat_new_log);
 /* Assumes caller has already pushed us into the kernel context and is locking.
  * We return a lock on the handle to ensure nobody yanks it from us.
  */
-int llog_cat_id2handle(struct llog_handle *cathandle,
-                                          struct llog_handle **res,
-                                          struct llog_logid *logid)
+int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
+                       struct llog_logid *logid)
 {
         struct llog_handle *loghandle;
         int rc = 0;
@@ -130,7 +129,7 @@ int llog_cat_id2handle(struct llog_handle *cathandle,
                 }
         }
 
-        rc = llog_open(cathandle->lgh_obd, &loghandle, logid);
+        rc = llog_open(cathandle->lgh_obd, &loghandle, logid, NULL);
         if (rc) {
                 CERROR("error opening log id "LPX64":%x: rc %d\n",
                        logid->lgl_oid, logid->lgl_ogen, rc);
index d2ac439..bcfcd2e 100644 (file)
@@ -69,7 +69,7 @@ static int llog_lvfs_pad(struct l_file *file, int len, int index)
 }
 
 static int llog_lvfs_write_blob(struct l_file *file, struct llog_rec_hdr *rec,
-                               void *buf, loff_t off)
+                                void *buf, loff_t off)
 {
         int rc;
         struct llog_rec_tail end;
@@ -120,12 +120,56 @@ static int llog_lvfs_write_blob(struct l_file *file, struct llog_rec_hdr *rec,
         RETURN(rc);
 }
 
+static int llog_lvfs_read_blob(struct l_file *file, void *buf, int size,
+                               loff_t off)
+{
+        loff_t offset = off;
+        int rc;
+        ENTRY;
+
+        rc = lustre_fread(file, buf, size, &offset);
+        if (rc != size) {
+                CERROR("error reading log record: rc %d\n", rc);
+                RETURN(-EIO);
+        }
+        RETURN(0);
+}
+
+static int llog_lvfs_read_header(struct llog_handle *handle)
+{
+        struct llog_rec_tail tail;
+        int rc;
+        ENTRY;
+
+        LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
+
+        if (handle->lgh_file->f_dentry->d_inode->i_size == 0) {
+                CERROR("not reading header from 0-byte log\n");
+                RETURN(0);
+        }
+
+        rc = llog_lvfs_read_blob(handle->lgh_file, handle->lgh_hdr,
+                                 LLOG_CHUNK_SIZE, 0);
+        if (rc)
+                CERROR("error reading log header\n");
+
+        rc = llog_lvfs_read_blob(handle->lgh_file, &tail, sizeof(tail),
+                                 handle->lgh_file->f_dentry->d_inode->i_size -
+                                 sizeof(tail));
+        if (rc)
+                CERROR("error reading log tail\n");
+
+        handle->lgh_last_idx = tail.lrt_index;
+
+        RETURN(rc);
+}
+
 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
 /* appends if idx == -1, otherwise overwrites record idx. */
-int llog_lvfs_write_rec(struct llog_handle *loghandle,
-                        struct llog_rec_hdr *rec,
-                        struct llog_cookie *reccookie, int cookiecount, 
-                        void *buf, int idx)
+static int llog_lvfs_write_rec(struct llog_handle *loghandle,
+                               struct llog_rec_hdr *rec,
+                               struct llog_cookie *reccookie, int cookiecount, 
+                               void *buf, int idx)
 {
         struct llog_log_hdr *llh;
         int reclen = rec->lrh_len, index, rc;
@@ -142,7 +186,7 @@ int llog_lvfs_write_rec(struct llog_handle *loghandle,
 
                 /* no header: only allowed to insert record 0 */
                 if (idx != 0 && !file->f_dentry->d_inode->i_size) {
-                        CERROR("idx != -1 in empty log ");
+                        CERROR("idx != -1 in empty log\n");
                         LBUG();
                 }
 
@@ -173,13 +217,13 @@ int llog_lvfs_write_rec(struct llog_handle *loghandle,
                         sizeof(struct llog_rec_tail);
 
         if (left != 0 && left <= reclen) {
-                loghandle->lgh_index++;
-                rc = llog_lvfs_pad(file, left, loghandle->lgh_index);
+                loghandle->lgh_last_idx++;
+                rc = llog_lvfs_pad(file, left, loghandle->lgh_last_idx);
                 if (rc)
                         RETURN(rc);
         }
 
-        index = loghandle->lgh_index++;
+        index = loghandle->lgh_last_idx++;
         rec->lrh_index = index;
         if (ext2_set_bit(index, llh->llh_bitmap)) {
                 CERROR("argh, index %u already set in log bitmap?\n", index);
@@ -205,10 +249,10 @@ int llog_lvfs_write_rec(struct llog_handle *loghandle,
         }
         RETURN(rc);
 }
-EXPORT_SYMBOL(llog_lvfs_write_rec);
 
-int llog_lvfs_next_block(struct llog_handle *loghandle, int cur_idx,
-                         int next_idx, __u64 *cur_offset, void *buf, int len)
+static int llog_lvfs_next_block(struct llog_handle *loghandle, int cur_idx,
+                                int next_idx, __u64 *cur_offset, void *buf,
+                                int len)
 {
         int rc;
         ENTRY;
@@ -270,13 +314,11 @@ int llog_lvfs_next_block(struct llog_handle *loghandle, int cur_idx,
 
         RETURN(-ENOENT);
 }
-EXPORT_SYMBOL(llog_lvfs_next_block);
-
 
 /* This is a callback from the llog_* functions.
  * Assumes caller has already pushed us into the kernel context. */
-int llog_lvfs_create(struct obd_device *obd, 
-                                     struct llog_handle **res, char *name)
+static int llog_lvfs_create(struct obd_device *obd, struct llog_handle **res,
+                            char *name)
 {
         char logname[24];
         struct llog_handle *handle;
@@ -290,6 +332,7 @@ int llog_lvfs_create(struct obd_device *obd,
         *res = handle;
 
         if (name) {
+                LASSERT(strlen(name) <= 18);
                 sprintf(logname, "LOGS/%s", name);
                 
                 handle->lgh_file = l_filp_open(logname, open_flags, 0644);
@@ -333,7 +376,79 @@ out_handle:
         return rc;
 }
 
-int llog_lvfs_close(struct llog_handle *handle)
+static int llog_lvfs_open(struct obd_device *obd, struct llog_handle **res, 
+                          struct llog_logid *logid, char *name)
+{
+        struct llog_handle *loghandle;
+        struct l_dentry *dchild = NULL;
+        int rc, cleanup_phase = 1;
+        ENTRY;
+
+        *res = loghandle = llog_alloc_handle();
+        if (loghandle == NULL)
+                RETURN(-ENOMEM);
+        loghandle->lgh_obd = obd;
+
+        if (logid != NULL) {
+                dchild = obd_lvfs_fid2dentry(obd->obd_log_exp, logid->lgl_oid,
+                                             logid->lgl_ogr);
+                if (IS_ERR(dchild)) {
+                        rc = PTR_ERR(dchild);
+                        CERROR("error looking up log file "LPX64":"LPX64
+                               ": rc %d\n",
+                               logid->lgl_oid, logid->lgl_ogr, rc);
+                        GOTO(cleanup, rc);
+                }
+
+                cleanup_phase = 2;
+                if (dchild->d_inode == NULL) {
+                        rc = -ENOENT;
+                        CERROR("nonexistent log file "LPX64":"LPX64": rc %d\n",
+                               logid->lgl_oid, logid->lgl_ogr, rc);
+                        GOTO(cleanup, rc);
+                }
+
+                loghandle->lgh_file = l_dentry_open(&obd->obd_ctxt, dchild,
+                                                    O_RDWR | O_LARGEFILE);
+                if (IS_ERR(loghandle->lgh_file)) {
+                        rc = PTR_ERR(loghandle->lgh_file);
+                        CERROR("error opening logfile "LPX64":"LPX64": rc %d\n",
+                               logid->lgl_oid, logid->lgl_ogr, rc);
+                        GOTO(cleanup, rc);
+                }
+        } else if (name != NULL) {
+                char logname[24];
+                int open_flags = O_RDWR | O_LARGEFILE;
+                LASSERT(strlen(name) <= 18);
+                sprintf(logname, "LOGS/%s", name);
+
+                loghandle->lgh_file = l_filp_open(logname, open_flags, 0644);
+                if (IS_ERR(loghandle->lgh_file)) {
+                        rc = PTR_ERR(loghandle->lgh_file);
+                        CERROR("logfile open by name %s: %d\n", logname, rc);
+                        GOTO(cleanup, rc);
+                }
+        } else {
+                LBUG();
+        }
+
+        loghandle->lgh_id.lgl_oid =
+                loghandle->lgh_file->f_dentry->d_inode->i_ino;
+        loghandle->lgh_id.lgl_ogen =
+                loghandle->lgh_file->f_dentry->d_inode->i_generation;
+
+        RETURN(0);
+ cleanup:
+        switch (cleanup_phase) {
+        case 2:
+                l_dput(dchild);
+        case 1:
+                llog_free_handle(loghandle);
+        }
+        return rc;
+}
+
+static int llog_lvfs_close(struct llog_handle *handle)
 {
         int rc;
         ENTRY;
@@ -346,7 +461,7 @@ int llog_lvfs_close(struct llog_handle *handle)
         RETURN(rc);
 }
 
-int llog_lvfs_destroy(struct llog_handle *handle)
+static int llog_lvfs_destroy(struct llog_handle *handle)
 {
         struct obdo *oa;
         int rc;
@@ -416,10 +531,6 @@ int mds_log_close(struct llog_handle *cathandle, struct llog_handle *loghandle)
         RETURN(rc);
 }
 
-struct llog_handle *mds_log_open(struct obd_device *obd,
-                                 struct llog_cookie *logcookie);
-
-
 /* This is a callback from the llog_* functions.
  * Assumes caller has already pushed us into the kernel context. */
 struct llog_handle *mds_log_open(struct obd_device *obd,
@@ -651,12 +762,14 @@ out_handle:
 
 
 struct llog_operations llog_lvfs_ops = {
-        lop_write_rec: llog_lvfs_write_rec,
-        lop_next_block: llog_lvfs_next_block,
-        //        lop_open: llog_lvfs_open,
+        lop_write_rec:   llog_lvfs_write_rec,
+        lop_next_block:  llog_lvfs_next_block,
+        lop_open:        llog_lvfs_open,
+        lop_read_header: llog_lvfs_read_header,
+        lop_create:      llog_lvfs_create,
+        lop_destroy:     llog_lvfs_destroy,
+        lop_close:       llog_lvfs_close,
         //        lop_cancel: llog_lvfs_cancel,
-        lop_create:llog_lvfs_create,
-        lop_close:llog_lvfs_close
 };
 
 EXPORT_SYMBOL(llog_lvfs_ops);