Whamcloud - gitweb
port llog fixes from b1_6 into HEAD
[fs/lustre-release.git] / lustre / mds / mds_log.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/mds_log.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #ifndef AUTOCONF_INCLUDED
33 #include <linux/config.h>
34 #endif
35 #include <linux/module.h>
36 #include <linux/version.h>
37
38 #include <libcfs/list.h>
39 #include <obd_class.h>
40 #include <lustre_fsfilt.h>
41 #include <lustre_mds.h>
42 #include <lustre_commit_confd.h>
43 #include <lustre_log.h>
44
45 #include "mds_internal.h"
46
47 static int mds_llog_origin_add(struct llog_ctxt *ctxt,
48                         struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
49                         struct llog_cookie *logcookies, int numcookies)
50 {
51         struct obd_device *obd = ctxt->loc_obd;
52         struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
53         struct llog_ctxt *lctxt;
54         int rc;
55         ENTRY;
56
57         lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
58         rc = llog_add(lctxt, rec, lsm, logcookies, numcookies);
59         llog_ctxt_put(lctxt);
60
61         RETURN(rc);
62 }
63
64 static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count,
65                                    struct llog_logid *logid,
66                                    struct llog_gen *gen,
67                                    struct obd_uuid *uuid)
68 {
69         struct obd_device *obd = ctxt->loc_obd;
70         struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
71         struct llog_ctxt *lctxt;
72         int rc;
73         ENTRY;
74
75         lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
76         rc = llog_connect(lctxt, count, logid, gen, uuid);
77         llog_ctxt_put(lctxt);
78         RETURN(rc);
79 }
80
81 static int mds_llog_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
82                           int count, struct llog_cookie *cookies, int flags)
83 {
84         struct obd_device *obd = ctxt->loc_obd;
85         struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
86         struct llog_ctxt *lctxt;
87         int rc;
88         ENTRY;
89
90         lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
91         rc = llog_cancel(lctxt, lsm, count, cookies, flags);
92         llog_ctxt_put(lctxt);
93         RETURN(rc);
94 }
95
96 int mds_log_op_unlink(struct obd_device *obd, 
97                       struct lov_mds_md *lmm, int lmm_size,
98                       struct llog_cookie *logcookies, int cookies_size)
99 {
100         struct mds_obd *mds = &obd->u.mds;
101         struct lov_stripe_md *lsm = NULL;
102         struct llog_unlink_rec *lur;
103         struct llog_ctxt *ctxt;
104         int rc;
105         ENTRY;
106
107         if (IS_ERR(mds->mds_osc_obd))
108                 RETURN(PTR_ERR(mds->mds_osc_obd));
109
110         rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
111         if (rc < 0)
112                 RETURN(rc);
113         rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, lsm);
114         if (rc)
115                 GOTO(out, rc);
116         /* first prepare unlink log record */
117         OBD_ALLOC(lur, sizeof(*lur));
118         if (!lur)
119                 GOTO(out, rc = -ENOMEM);
120         lur->lur_hdr.lrh_len = lur->lur_tail.lrt_len = sizeof(*lur);
121         lur->lur_hdr.lrh_type = MDS_UNLINK_REC;
122
123         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
124         rc = llog_add(ctxt, &lur->lur_hdr, lsm, logcookies,
125                       cookies_size / sizeof(struct llog_cookie));
126         llog_ctxt_put(ctxt);
127
128         OBD_FREE(lur, sizeof(*lur));
129 out:
130         obd_free_memmd(mds->mds_osc_exp, &lsm);
131         RETURN(rc);
132 }
133 EXPORT_SYMBOL(mds_log_op_unlink);
134 int mds_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid,
135                       struct lov_mds_md *lmm, int lmm_size,
136                       struct llog_cookie *logcookies, int cookies_size)
137 {
138         struct mds_obd *mds = &obd->u.mds;
139         struct lov_stripe_md *lsm = NULL;
140         struct llog_setattr_rec *lsr;
141         struct llog_ctxt *ctxt;
142         int rc;
143         ENTRY;
144
145         if (IS_ERR(mds->mds_osc_obd))
146                 RETURN(PTR_ERR(mds->mds_osc_obd));
147
148         rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
149         if (rc < 0)
150                 RETURN(rc);
151
152         rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, lsm);
153         if (rc)
154                 GOTO(out, rc);
155
156         OBD_ALLOC(lsr, sizeof(*lsr));
157         if (!lsr)
158                 GOTO(out, rc = -ENOMEM);
159
160         /* prepare setattr log record */
161         lsr->lsr_hdr.lrh_len = lsr->lsr_tail.lrt_len = sizeof(*lsr);
162         lsr->lsr_hdr.lrh_type = MDS_SETATTR_REC;
163         lsr->lsr_uid = uid;
164         lsr->lsr_gid = gid;
165
166         /* write setattr log */
167         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
168         rc = llog_add(ctxt, &lsr->lsr_hdr, lsm, logcookies,
169                       cookies_size / sizeof(struct llog_cookie));
170
171         llog_ctxt_put(ctxt);
172
173         OBD_FREE(lsr, sizeof(*lsr));
174  out:
175         obd_free_memmd(mds->mds_osc_exp, &lsm);
176         RETURN(rc);
177 }
178 EXPORT_SYMBOL(mds_log_op_setattr);
179
180 static struct llog_operations mds_ost_orig_logops = {
181         lop_add:        mds_llog_origin_add,
182         lop_connect:    mds_llog_origin_connect,
183 };
184
185 static struct llog_operations mds_size_repl_logops = {
186         lop_cancel:     mds_llog_repl_cancel,
187 };
188
189 int mds_llog_init(struct obd_device *obd, int group,
190                   struct obd_device *tgt, int count, struct llog_catid *logid, 
191                   struct obd_uuid *uuid)
192 {
193         struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
194         int rc;
195         ENTRY;
196
197         LASSERT(group == OBD_LLOG_GROUP);
198         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
199                         &mds_ost_orig_logops);
200         if (rc)
201                 RETURN(rc);
202
203         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
204                         &mds_size_repl_logops);
205         if (rc)
206                 RETURN(rc);
207
208         rc = obd_llog_init(lov_obd, group, tgt, count, logid, uuid);
209         if (rc)
210                 CERROR("lov_llog_init err %d\n", rc);
211
212         RETURN(rc);
213 }
214
215 int mds_llog_finish(struct obd_device *obd, int count)
216 {
217         struct llog_ctxt *ctxt;
218         int rc = 0, rc2 = 0;
219         ENTRY;
220
221         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
222         if (ctxt)
223                 rc = llog_cleanup(ctxt);
224
225         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
226         if (ctxt)
227                 rc2 = llog_cleanup(ctxt);
228         if (!rc)
229                 rc = rc2;
230
231         RETURN(rc);
232 }