Whamcloud - gitweb
lu_site_fini(): break reference from lu_device to lu_site
[fs/lustre-release.git] / lustre / obdclass / llog_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2005 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LOG
26
27 #ifndef EXPORT_SYMTAB
28 #define EXPORT_SYMTAB
29 #endif
30
31 #ifdef __KERNEL__
32 #include <linux/fs.h>
33 #else
34 #include <liblustre.h>
35 #endif
36
37 #include <linux/obd_class.h>
38 #include <linux/lustre_log.h>
39 #include <libcfs/list.h>
40 #include "llog_internal.h"
41
42 /* helper functions for calling the llog obd methods */
43
44 int llog_cleanup(struct llog_ctxt *ctxt)
45 {
46         int rc = 0;
47         ENTRY;
48
49         if (!ctxt) {
50                 CERROR("No ctxt\n");
51                 RETURN(-ENODEV);
52         }
53         
54         if (CTXTP(ctxt, cleanup))
55                 rc = CTXTP(ctxt, cleanup)(ctxt);
56
57         ctxt->loc_obd->obd_llog_ctxt[ctxt->loc_idx] = NULL;
58         if (ctxt->loc_exp)
59                 class_export_put(ctxt->loc_exp);
60         OBD_FREE(ctxt, sizeof(*ctxt));
61
62         RETURN(rc);
63 }
64 EXPORT_SYMBOL(llog_cleanup);
65
66 int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
67                int count, struct llog_logid *logid, struct llog_operations *op)
68 {
69         int rc = 0;
70         struct llog_ctxt *ctxt;
71         ENTRY;
72
73         if (index < 0 || index >= LLOG_MAX_CTXTS)
74                 RETURN(-EFAULT);
75
76         if (obd->obd_llog_ctxt[index]) {
77         /* During an mds_lov_add_ost, we try to tear down and resetup llogs.
78            But the mdt teardown does not flow down to the lov/osc's as the 
79            setup does, because the lov/osc must clean up only when they are
80            done, not when the mdt is done. So instead, we just assume that
81            if the lov llogs are already set up then we must cleanup first. */
82                 CDEBUG(D_CONFIG, "obd %s ctxt %d already set up\n", 
83                        obd->obd_name, index);
84                 llog_cleanup(obd->obd_llog_ctxt[index]);
85         }
86
87         OBD_ALLOC(ctxt, sizeof(*ctxt));
88         if (!ctxt)
89                 RETURN(-ENOMEM);
90
91         obd->obd_llog_ctxt[index] = ctxt;
92         ctxt->loc_obd = obd;
93         ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
94         ctxt->loc_idx = index;
95         ctxt->loc_logops = op;
96         sema_init(&ctxt->loc_sem, 1);
97
98         if (op->lop_setup)
99                 rc = op->lop_setup(obd, index, disk_obd, count, logid);
100
101         RETURN(rc);
102 }
103 EXPORT_SYMBOL(llog_setup);
104
105 int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
106 {
107         int rc = 0;
108         ENTRY;
109
110         if (!ctxt)
111                 RETURN(0);
112
113         if (CTXTP(ctxt, sync))
114                 rc = CTXTP(ctxt, sync)(ctxt, exp);
115
116         RETURN(rc);
117 }
118 EXPORT_SYMBOL(llog_sync);
119
120 int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
121                 struct lov_stripe_md *lsm, struct llog_cookie *logcookies,
122                 int numcookies)
123 {
124         int rc;
125         ENTRY;
126
127         if (!ctxt) {
128                 CERROR("No ctxt\n");
129                 RETURN(-ENODEV);
130         }
131         
132         CTXT_CHECK_OP(ctxt, add, -EOPNOTSUPP);
133
134         rc = CTXTP(ctxt, add)(ctxt, rec, lsm, logcookies, numcookies);
135         RETURN(rc);
136 }
137 EXPORT_SYMBOL(llog_add);
138
139 int llog_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
140                 int count, struct llog_cookie *cookies, int flags)
141 {
142         int rc;
143         ENTRY;
144
145         if (!ctxt) {
146                 CERROR("No ctxt\n");
147                 RETURN(-ENODEV);
148         }
149         
150         CTXT_CHECK_OP(ctxt, cancel, -EOPNOTSUPP);
151         rc = CTXTP(ctxt, cancel)(ctxt, lsm, count, cookies, flags);
152         RETURN(rc);
153 }
154 EXPORT_SYMBOL(llog_cancel);
155
156 /* callback func for llog_process in llog_obd_origin_setup */
157 static int cat_cancel_cb(struct llog_handle *cathandle,
158                           struct llog_rec_hdr *rec, void *data)
159 {
160         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
161         struct llog_handle *loghandle;
162         struct llog_log_hdr *llh;
163         int rc, index;
164         ENTRY;
165
166         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
167                 CERROR("invalid record in catalog\n");
168                 RETURN(-EINVAL);
169         }
170         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
171                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
172                rec->lrh_index, cathandle->lgh_id.lgl_oid);
173
174         rc = llog_cat_id2handle(cathandle, &loghandle, &lir->lid_id);
175         if (rc) {
176                 CERROR("Cannot find handle for log "LPX64"\n",
177                        lir->lid_id.lgl_oid);
178                 RETURN(rc);
179         }
180
181         llh = loghandle->lgh_hdr;
182         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
183             (llh->llh_count == 1)) {
184                 rc = llog_destroy(loghandle);
185                 if (rc)
186                         CERROR("failure destroying log in postsetup: %d\n", rc);
187
188                 index = loghandle->u.phd.phd_cookie.lgc_index;
189                 llog_free_handle(loghandle);
190
191                 LASSERT(index);
192                 llog_cat_set_first_idx(cathandle, index);
193                 rc = llog_cancel_rec(cathandle, index);
194                 if (rc == 0)
195                         CWARN("cancel log "LPX64":%x at index %u of catalog "
196                               LPX64"\n", lir->lid_id.lgl_oid,
197                               lir->lid_id.lgl_ogen, rec->lrh_index,
198                               cathandle->lgh_id.lgl_oid);
199         }
200
201         RETURN(rc);
202 }
203
204 /* lop_setup method for filter/osc */
205 // XXX how to set exports
206 int llog_obd_origin_setup(struct obd_device *obd, int index,
207                           struct obd_device *disk_obd, int count,
208                           struct llog_logid *logid)
209 {
210         struct llog_ctxt *ctxt;
211         struct llog_handle *handle;
212         struct lvfs_run_ctxt saved;
213         int rc;
214         ENTRY;
215
216         if (count == 0)
217                 RETURN(0);
218
219         LASSERT(count == 1);
220
221         ctxt = llog_get_context(obd, index);
222         LASSERT(ctxt);
223         llog_gen_init(ctxt);
224
225         if (logid->lgl_oid)
226                 rc = llog_create(ctxt, &handle, logid, NULL);
227         else {
228                 rc = llog_create(ctxt, &handle, NULL, NULL);
229                 if (!rc)
230                         *logid = handle->lgh_id;
231         }
232         if (rc)
233                 GOTO(out, rc);
234
235         ctxt->loc_handle = handle;
236         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
237         rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL);
238         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
239         if (rc)
240                 GOTO(out, rc);
241
242         rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL, NULL);
243         if (rc)
244                 CERROR("llog_process with cat_cancel_cb failed: %d\n", rc);
245  out:
246         if (ctxt && rc) {
247                 obd->obd_llog_ctxt[index] = NULL;
248                 OBD_FREE(ctxt, sizeof(*ctxt));
249         }
250         RETURN(rc);
251 }
252 EXPORT_SYMBOL(llog_obd_origin_setup);
253
254 int llog_obd_origin_cleanup(struct llog_ctxt *ctxt)
255 {
256         struct llog_handle *cathandle, *n, *loghandle;
257         struct llog_log_hdr *llh;
258         int rc, index;
259         ENTRY;
260
261         if (!ctxt)
262                 RETURN(0);
263
264         cathandle = ctxt->loc_handle;
265         if (cathandle) {
266                 list_for_each_entry_safe(loghandle, n,
267                                          &cathandle->u.chd.chd_head,
268                                          u.phd.phd_entry) {
269                         llh = loghandle->lgh_hdr;
270                         if ((llh->llh_flags &
271                                 LLOG_F_ZAP_WHEN_EMPTY) &&
272                             (llh->llh_count == 1)) {
273                                 rc = llog_destroy(loghandle);
274                                 if (rc)
275                                         CERROR("failure destroying log during "
276                                                "cleanup: %d\n", rc);
277
278                                 index = loghandle->u.phd.phd_cookie.lgc_index;
279                                 llog_free_handle(loghandle);
280
281                                 LASSERT(index);
282                                 llog_cat_set_first_idx(cathandle, index);
283                                 rc = llog_cancel_rec(cathandle, index);
284                                 if (rc == 0)
285                                         CDEBUG(D_HA, "cancel plain log at index"
286                                                " %u of catalog "LPX64"\n",
287                                                index,cathandle->lgh_id.lgl_oid);
288                         }
289                 }
290                 llog_cat_put(ctxt->loc_handle);
291         }
292         RETURN(0);
293 }
294 EXPORT_SYMBOL(llog_obd_origin_cleanup);
295
296 /* add for obdfilter/sz and mds/unlink */
297 int llog_obd_origin_add(struct llog_ctxt *ctxt,
298                         struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
299                         struct llog_cookie *logcookies, int numcookies)
300 {
301         struct llog_handle *cathandle;
302         int rc;
303         ENTRY;
304
305         cathandle = ctxt->loc_handle;
306         LASSERT(cathandle != NULL);
307         rc = llog_cat_add_rec(cathandle, rec, logcookies, NULL);
308         if (rc != 1)
309                 CERROR("write one catalog record failed: %d\n", rc);
310         RETURN(rc);
311 }
312 EXPORT_SYMBOL(llog_obd_origin_add);
313
314 int llog_cat_initialize(struct obd_device *obd, int count)
315 {
316         struct llog_catid *idarray;
317         int size = sizeof(*idarray) * count;
318         char name[32] = CATLIST;
319         int rc;
320         ENTRY;
321
322         OBD_ALLOC(idarray, size);
323         if (!idarray)
324                 RETURN(-ENOMEM);
325
326         rc = llog_get_cat_list(obd, obd, name, count, idarray);
327         if (rc) {
328                 CERROR("rc: %d\n", rc);
329                 GOTO(out, rc);
330         }
331
332         rc = obd_llog_init(obd, obd, count, idarray);
333         if (rc) {
334                 CERROR("rc: %d\n", rc);
335                 GOTO(out, rc);
336         }
337
338         rc = llog_put_cat_list(obd, obd, name, count, idarray);
339         if (rc) {
340                 CERROR("rc: %d\n", rc);
341                 GOTO(out, rc);
342         }
343
344  out:
345         OBD_FREE(idarray, size);
346         RETURN(rc);
347 }
348 EXPORT_SYMBOL(llog_cat_initialize);
349
350 int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
351                   int count, struct llog_catid *logid)
352 {
353         int rc;
354         ENTRY;
355         OBD_CHECK_OP(obd, llog_init, 0);
356         OBD_COUNTER_INCREMENT(obd, llog_init);
357
358         rc = OBP(obd, llog_init)(obd, disk_obd, count, logid);
359         RETURN(rc);
360 }
361 EXPORT_SYMBOL(obd_llog_init);
362
363 int obd_llog_finish(struct obd_device *obd, int count)
364 {
365         int rc;
366         ENTRY;
367         OBD_CHECK_OP(obd, llog_finish, 0);
368         OBD_COUNTER_INCREMENT(obd, llog_finish);
369
370         rc = OBP(obd, llog_finish)(obd, count);
371         RETURN(rc);
372 }
373 EXPORT_SYMBOL(obd_llog_finish);