Whamcloud - gitweb
smash the HEAD with the contents of b_cmd. HEAD_PRE_CMD_SMASH and
[fs/lustre-release.git] / lustre / obdclass / llog_obd.c
1
2 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3  * vim:expandtab:shiftwidth=8:tabstop=8:
4  *
5  *   You should have received a copy of the GNU General Public License
6  *   along with Lustre; if not, write to the Free Software
7  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
8  */
9
10 #define DEBUG_SUBSYSTEM S_LOG
11
12 #ifndef EXPORT_SYMTAB
13 #define EXPORT_SYMTAB
14 #endif
15
16 #ifdef __KERNEL__
17 #include <linux/fs.h>
18 #else
19 #include <liblustre.h>
20 #endif
21
22 #include <linux/obd_class.h>
23 #include <linux/lustre_log.h>
24 #include <portals/list.h>
25 #include "llog_internal.h"
26
27 /* helper functions for calling the llog obd methods */
28
29 int llog_setup(struct obd_device *obd, struct obd_llogs *llogs, int index,
30                struct obd_device *disk_obd, int count, struct llog_logid *logid,
31                struct llog_operations *op)
32 {
33         int rc = 0;
34         struct llog_ctxt *ctxt;
35         ENTRY;
36
37         LASSERT(llogs);
38
39         if (index < 0 || index >= LLOG_MAX_CTXTS)
40                 RETURN(-EFAULT);
41
42         OBD_ALLOC(ctxt, sizeof(*ctxt));
43         if (!ctxt)
44                 RETURN(-ENOMEM);
45
46         llogs->llog_ctxt[index] = ctxt;
47         ctxt->loc_obd = obd;
48         ctxt->loc_llogs = llogs;
49         ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
50         ctxt->loc_idx = index;
51         ctxt->loc_logops = op;
52         sema_init(&ctxt->loc_sem, 1);
53
54         if (op->lop_setup)
55                 rc = op->lop_setup(obd, llogs, index, disk_obd, count, logid);
56         if (ctxt && rc)
57                 OBD_FREE(ctxt, sizeof(*ctxt));
58
59         RETURN(rc);
60 }
61 EXPORT_SYMBOL(llog_setup);
62
63 int llog_cleanup(struct llog_ctxt *ctxt)
64 {
65         int rc = 0;
66         ENTRY;
67
68         LASSERT(ctxt);
69
70         if (CTXTP(ctxt, cleanup))
71                 rc = CTXTP(ctxt, cleanup)(ctxt);
72
73         ctxt->loc_llogs->llog_ctxt[ctxt->loc_idx] = NULL;
74         class_export_put(ctxt->loc_exp);
75         ctxt->loc_exp = NULL;
76         OBD_FREE(ctxt, sizeof(*ctxt));
77
78         RETURN(rc);
79 }
80 EXPORT_SYMBOL(llog_cleanup);
81
82 int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
83 {
84         int rc = 0;
85         ENTRY;
86
87         if (!ctxt)
88                 RETURN(0);
89
90         if (CTXTP(ctxt, sync))
91                 rc = CTXTP(ctxt, sync)(ctxt, exp);
92
93         RETURN(rc);
94 }
95 EXPORT_SYMBOL(llog_sync);
96
97 int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
98                 struct lov_stripe_md *lsm, struct llog_cookie *logcookies,
99                 int numcookies)
100 {
101         int rc;
102         ENTRY;
103
104         LASSERT(ctxt);
105         CTXT_CHECK_OP(ctxt, add, -EOPNOTSUPP);
106
107         rc = CTXTP(ctxt, add)(ctxt, rec, lsm, logcookies, numcookies);
108         RETURN(rc);
109 }
110 EXPORT_SYMBOL(llog_add);
111
112 int llog_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
113                 int count, struct llog_cookie *cookies, int flags)
114 {
115         int rc;
116         ENTRY;
117
118         LASSERT(ctxt);
119         CTXT_CHECK_OP(ctxt, cancel, -EOPNOTSUPP);
120         rc = CTXTP(ctxt, cancel)(ctxt, lsm, count, cookies, flags);
121         RETURN(rc);
122 }
123 EXPORT_SYMBOL(llog_cancel);
124
125 /* callback func for llog_process in llog_obd_origin_setup */
126 static int cat_cancel_cb(struct llog_handle *cathandle,
127                           struct llog_rec_hdr *rec, void *data)
128 {
129         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
130         struct llog_handle *loghandle;
131         struct llog_log_hdr *llh;
132         int rc, index;
133         ENTRY;
134
135         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
136                 CERROR("invalid record in catalog\n");
137                 RETURN(-EINVAL);
138         }
139         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
140                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
141                rec->lrh_index, cathandle->lgh_id.lgl_oid);
142
143         rc = llog_cat_id2handle(cathandle, &loghandle, &lir->lid_id);
144         if (rc) {
145                 CERROR("Cannot find handle for log "LPX64"\n",
146                        lir->lid_id.lgl_oid);
147                 RETURN(rc);
148         }
149
150         llh = loghandle->lgh_hdr;
151         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
152             (llh->llh_count == 1)) {
153                 rc = llog_destroy(loghandle);
154                 if (rc)
155                         CERROR("failure destroying log in postsetup: %d\n", rc);
156                 LASSERT(rc == 0);
157
158                 index = loghandle->u.phd.phd_cookie.lgc_index;
159                 llog_free_handle(loghandle);
160
161                 LASSERT(index);
162                 llog_cat_set_first_idx(cathandle, index);
163                 rc = llog_cancel_rec(cathandle, index);
164                 if (rc == 0)
165                         CWARN("cancel log "LPX64":%x at index %u of catalog "
166                               LPX64"\n", lir->lid_id.lgl_oid,
167                               lir->lid_id.lgl_ogen, rec->lrh_index,
168                               cathandle->lgh_id.lgl_oid);
169         }
170
171         RETURN(rc);
172 }
173
174 /* lop_setup method for filter/osc */
175 // XXX how to set exports
176 int llog_obd_origin_setup(struct obd_device *obd, struct obd_llogs *llogs,
177                           int index, struct obd_device *disk_obd, int count,
178                           struct llog_logid *logid)
179 {
180         struct llog_ctxt *ctxt;
181         struct llog_handle *handle;
182         struct obd_run_ctxt saved;
183         int rc;
184         ENTRY;
185
186         if (count == 0)
187                 RETURN(0);
188
189         LASSERT(count == 1);
190
191         ctxt = llog_get_context(llogs, index);
192         LASSERT(ctxt);
193         llog_gen_init(ctxt);
194
195         if (logid->lgl_oid)
196                 rc = llog_create(ctxt, &handle, logid, NULL);
197         else {
198                 rc = llog_create(ctxt, &handle, NULL, NULL);
199                 if (!rc)
200                         *logid = handle->lgh_id;
201         }
202         if (rc)
203                 GOTO(out, rc);
204
205         ctxt->loc_handle = handle;
206         push_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
207         rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL);
208         pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
209         if (rc)
210                 GOTO(out, rc);
211
212         rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL, NULL);
213         if (rc)
214                 CERROR("llog_process with cat_cancel_cb failed: %d\n", rc);
215  out:
216         if (ctxt && rc) {
217                 llogs->llog_ctxt[index] = NULL;
218                 OBD_FREE(ctxt, sizeof(*ctxt));
219         }
220         RETURN(rc);
221 }
222 EXPORT_SYMBOL(llog_obd_origin_setup);
223
224 int llog_obd_origin_cleanup(struct llog_ctxt *ctxt)
225 {
226         struct llog_handle *cathandle, *n, *loghandle;
227         struct llog_log_hdr *llh;
228         int rc, index;
229         ENTRY;
230
231         if (!ctxt)
232                 return 0;
233
234         cathandle = ctxt->loc_handle;
235         if (cathandle) {
236                 list_for_each_entry_safe(loghandle, n,
237                                          &cathandle->u.chd.chd_head,
238                                          u.phd.phd_entry) {
239                         llh = loghandle->lgh_hdr;
240                         if ((llh->llh_flags &
241                                 LLOG_F_ZAP_WHEN_EMPTY) &&
242                             (llh->llh_count == 1)) {
243                                 rc = llog_destroy(loghandle);
244                                 if (rc)
245                                         CERROR("failure destroying log during "
246                                                "cleanup: %d\n", rc);
247                                 LASSERT(rc == 0);
248
249                                 index = loghandle->u.phd.phd_cookie.lgc_index;
250                                 llog_free_handle(loghandle);
251
252                                 LASSERT(index);
253                                 llog_cat_set_first_idx(cathandle, index);
254                                 rc = llog_cancel_rec(cathandle, index);
255                                 if (rc == 0)
256                                         CDEBUG(D_HA, "cancel plain log at index"
257                                                " %u of catalog "LPX64"\n",
258                                                index,cathandle->lgh_id.lgl_oid);
259                         }
260                 }
261                 llog_cat_put(ctxt->loc_handle);
262         }
263         return 0;
264 }
265 EXPORT_SYMBOL(llog_obd_origin_cleanup);
266
267 /* add for obdfilter/sz and mds/unlink */
268 int llog_obd_origin_add(struct llog_ctxt *ctxt,
269                         struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
270                         struct llog_cookie *logcookies, int numcookies)
271 {
272         struct llog_handle *cathandle;
273         int rc;
274         ENTRY;
275
276         cathandle = ctxt->loc_handle;
277         LASSERT(cathandle != NULL);
278         rc = llog_cat_add_rec(cathandle, rec, logcookies, NULL);
279         if (rc != 1)
280                 CERROR("write one catalog record failed: %d\n", rc);
281         RETURN(rc);
282 }
283 EXPORT_SYMBOL(llog_obd_origin_add);
284
285 int llog_cat_initialize(struct obd_device *obd, struct obd_llogs *llogs, int count)
286 {
287         struct llog_catid *idarray;
288         int size = sizeof(*idarray) * count;
289         char name[32] = CATLIST;
290         int rc;
291         ENTRY;
292
293         OBD_ALLOC(idarray, size);
294         if (!idarray)
295                 RETURN(-ENOMEM);
296
297         rc = llog_get_cat_list(obd, obd, name, count, idarray);
298         if (rc) {
299                 CERROR("rc: %d\n", rc);
300                 GOTO(out, rc);
301         }
302
303         rc = obd_llog_init(obd, llogs, obd, count, idarray);
304         if (rc) {
305                 CERROR("rc: %d\n", rc);
306                 GOTO(out, rc);
307         }
308
309         rc = llog_put_cat_list(obd, obd, name, count, idarray);
310         if (rc) {
311                 CERROR("rc: %d\n", rc);
312                 GOTO(out, rc);
313         }
314
315  out:
316         OBD_FREE(idarray, size);
317         RETURN(rc);
318 }
319 EXPORT_SYMBOL(llog_cat_initialize);
320
321 int obd_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
322                   struct obd_device *disk_obd, int count,
323                   struct llog_catid *logid)
324 {
325         int rc;
326         ENTRY;
327         OBD_CHECK_OP(obd, llog_init, 0);
328         OBD_COUNTER_INCREMENT(obd, llog_init);
329
330         rc = OBP(obd, llog_init)(obd, llogs, disk_obd, count, logid);
331         RETURN(rc);
332 }
333 EXPORT_SYMBOL(obd_llog_init);
334
335 int obd_llog_finish(struct obd_device *obd, struct obd_llogs *llogs, int count)
336 {
337         int rc;
338         ENTRY;
339         OBD_CHECK_OP(obd, llog_finish, 0);
340         OBD_COUNTER_INCREMENT(obd, llog_finish);
341
342         rc = OBP(obd, llog_finish)(obd, llogs, count);
343         RETURN(rc);
344 }
345 EXPORT_SYMBOL(obd_llog_finish);