Whamcloud - gitweb
a1829bca899223ed7acf4208dbb0a771bcb72d65
[fs/lustre-release.git] / lustre / obdclass / llog_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2005 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LOG
26
27 #ifndef EXPORT_SYMTAB
28 #define EXPORT_SYMTAB
29 #endif
30
31 #ifndef __KERNEL__
32 #include <liblustre.h>
33 #endif
34
35 #include <obd_class.h>
36 #include <lustre_log.h>
37 #include <libcfs/list.h>
38 #include "llog_internal.h"
39
40 /* helper functions for calling the llog obd methods */
41
42 int llog_cleanup(struct llog_ctxt *ctxt)
43 {
44         int rc = 0;
45         ENTRY;
46
47         if (!ctxt) {
48                 CERROR("No ctxt\n");
49                 RETURN(-ENODEV);
50         }
51         
52         if (CTXTP(ctxt, cleanup))
53                 rc = CTXTP(ctxt, cleanup)(ctxt);
54
55         ctxt->loc_obd->obd_llog_ctxt[ctxt->loc_idx] = NULL;
56         if (ctxt->loc_exp)
57                 class_export_put(ctxt->loc_exp);
58         OBD_FREE(ctxt, sizeof(*ctxt));
59
60         RETURN(rc);
61 }
62 EXPORT_SYMBOL(llog_cleanup);
63
64 int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
65                int count, struct llog_logid *logid, struct llog_operations *op)
66 {
67         int rc = 0;
68         struct llog_ctxt *ctxt;
69         ENTRY;
70
71         if (index < 0 || index >= LLOG_MAX_CTXTS)
72                 RETURN(-EFAULT);
73
74         if (obd->obd_llog_ctxt[index]) {
75         /* During an mds_lov_add_ost, we try to tear down and resetup llogs.
76            But the mdt teardown does not flow down to the lov/osc's as the 
77            setup does, because the lov/osc must clean up only when they are
78            done, not when the mdt is done. So instead, we just assume that
79            if the lov llogs are already set up then we must cleanup first. */
80                 CDEBUG(D_CONFIG, "obd %s ctxt %d already set up\n", 
81                        obd->obd_name, index);
82                 llog_cleanup(obd->obd_llog_ctxt[index]);
83         }
84
85         OBD_ALLOC(ctxt, sizeof(*ctxt));
86         if (!ctxt)
87                 RETURN(-ENOMEM);
88
89         obd->obd_llog_ctxt[index] = ctxt;
90         ctxt->loc_obd = obd;
91         ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
92         ctxt->loc_idx = index;
93         ctxt->loc_logops = op;
94         sema_init(&ctxt->loc_sem, 1);
95
96         if (op->lop_setup)
97                 rc = op->lop_setup(obd, index, disk_obd, count, logid);
98
99         RETURN(rc);
100 }
101 EXPORT_SYMBOL(llog_setup);
102
103 int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
104 {
105         int rc = 0;
106         ENTRY;
107
108         if (!ctxt)
109                 RETURN(0);
110
111         if (CTXTP(ctxt, sync))
112                 rc = CTXTP(ctxt, sync)(ctxt, exp);
113
114         RETURN(rc);
115 }
116 EXPORT_SYMBOL(llog_sync);
117
118 int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
119                 struct lov_stripe_md *lsm, struct llog_cookie *logcookies,
120                 int numcookies)
121 {
122         int rc;
123         ENTRY;
124
125         if (!ctxt) {
126                 CERROR("No ctxt\n");
127                 RETURN(-ENODEV);
128         }
129         
130         CTXT_CHECK_OP(ctxt, add, -EOPNOTSUPP);
131
132         rc = CTXTP(ctxt, add)(ctxt, rec, lsm, logcookies, numcookies);
133         RETURN(rc);
134 }
135 EXPORT_SYMBOL(llog_add);
136
137 int llog_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
138                 int count, struct llog_cookie *cookies, int flags)
139 {
140         int rc;
141         ENTRY;
142
143         if (!ctxt) {
144                 CERROR("No ctxt\n");
145                 RETURN(-ENODEV);
146         }
147         
148         CTXT_CHECK_OP(ctxt, cancel, -EOPNOTSUPP);
149         rc = CTXTP(ctxt, cancel)(ctxt, lsm, count, cookies, flags);
150         RETURN(rc);
151 }
152 EXPORT_SYMBOL(llog_cancel);
153
154 /* callback func for llog_process in llog_obd_origin_setup */
155 static int cat_cancel_cb(struct llog_handle *cathandle,
156                           struct llog_rec_hdr *rec, void *data)
157 {
158         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
159         struct llog_handle *loghandle;
160         struct llog_log_hdr *llh;
161         int rc, index;
162         ENTRY;
163
164         if (rec->lrh_type != LLOG_LOGID_MAGIC) {
165                 CERROR("invalid record in catalog\n");
166                 RETURN(-EINVAL);
167         }
168         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
169                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
170                rec->lrh_index, cathandle->lgh_id.lgl_oid);
171
172         rc = llog_cat_id2handle(cathandle, &loghandle, &lir->lid_id);
173         if (rc) {
174                 CERROR("Cannot find handle for log "LPX64"\n",
175                        lir->lid_id.lgl_oid);
176                 RETURN(rc);
177         }
178
179         llh = loghandle->lgh_hdr;
180         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
181             (llh->llh_count == 1)) {
182                 rc = llog_destroy(loghandle);
183                 if (rc)
184                         CERROR("failure destroying log in postsetup: %d\n", rc);
185
186                 index = loghandle->u.phd.phd_cookie.lgc_index;
187                 llog_free_handle(loghandle);
188
189                 LASSERT(index);
190                 llog_cat_set_first_idx(cathandle, index);
191                 rc = llog_cancel_rec(cathandle, index);
192                 if (rc == 0)
193                         CWARN("cancel log "LPX64":%x at index %u of catalog "
194                               LPX64"\n", lir->lid_id.lgl_oid,
195                               lir->lid_id.lgl_ogen, rec->lrh_index,
196                               cathandle->lgh_id.lgl_oid);
197         }
198
199         RETURN(rc);
200 }
201
202 /* lop_setup method for filter/osc */
203 // XXX how to set exports
204 int llog_obd_origin_setup(struct obd_device *obd, int index,
205                           struct obd_device *disk_obd, int count,
206                           struct llog_logid *logid)
207 {
208         struct llog_ctxt *ctxt;
209         struct llog_handle *handle;
210         struct lvfs_run_ctxt saved;
211         int rc;
212         ENTRY;
213
214         if (count == 0)
215                 RETURN(0);
216
217         LASSERT(count == 1);
218
219         ctxt = llog_get_context(obd, index);
220         LASSERT(ctxt);
221         llog_gen_init(ctxt);
222
223         if (logid->lgl_oid)
224                 rc = llog_create(ctxt, &handle, logid, NULL);
225         else {
226                 rc = llog_create(ctxt, &handle, NULL, NULL);
227                 if (!rc)
228                         *logid = handle->lgh_id;
229         }
230         if (rc)
231                 GOTO(out, rc);
232
233         ctxt->loc_handle = handle;
234         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
235         rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL);
236         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
237         if (rc)
238                 GOTO(out, rc);
239
240         rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL, NULL);
241         if (rc)
242                 CERROR("llog_process with cat_cancel_cb failed: %d\n", rc);
243  out:
244         if (ctxt && rc) {
245                 obd->obd_llog_ctxt[index] = NULL;
246                 OBD_FREE(ctxt, sizeof(*ctxt));
247         }
248         RETURN(rc);
249 }
250 EXPORT_SYMBOL(llog_obd_origin_setup);
251
252 int llog_obd_origin_cleanup(struct llog_ctxt *ctxt)
253 {
254         struct llog_handle *cathandle, *n, *loghandle;
255         struct llog_log_hdr *llh;
256         int rc, index;
257         ENTRY;
258
259         if (!ctxt)
260                 RETURN(0);
261
262         cathandle = ctxt->loc_handle;
263         if (cathandle) {
264                 list_for_each_entry_safe(loghandle, n,
265                                          &cathandle->u.chd.chd_head,
266                                          u.phd.phd_entry) {
267                         llh = loghandle->lgh_hdr;
268                         if ((llh->llh_flags &
269                                 LLOG_F_ZAP_WHEN_EMPTY) &&
270                             (llh->llh_count == 1)) {
271                                 rc = llog_destroy(loghandle);
272                                 if (rc)
273                                         CERROR("failure destroying log during "
274                                                "cleanup: %d\n", rc);
275
276                                 index = loghandle->u.phd.phd_cookie.lgc_index;
277                                 llog_free_handle(loghandle);
278
279                                 LASSERT(index);
280                                 llog_cat_set_first_idx(cathandle, index);
281                                 rc = llog_cancel_rec(cathandle, index);
282                                 if (rc == 0)
283                                         CDEBUG(D_HA, "cancel plain log at index"
284                                                " %u of catalog "LPX64"\n",
285                                                index,cathandle->lgh_id.lgl_oid);
286                         }
287                 }
288                 llog_cat_put(ctxt->loc_handle);
289         }
290         RETURN(0);
291 }
292 EXPORT_SYMBOL(llog_obd_origin_cleanup);
293
294 /* add for obdfilter/sz and mds/unlink */
295 int llog_obd_origin_add(struct llog_ctxt *ctxt,
296                         struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
297                         struct llog_cookie *logcookies, int numcookies)
298 {
299         struct llog_handle *cathandle;
300         int rc;
301         ENTRY;
302
303         cathandle = ctxt->loc_handle;
304         LASSERT(cathandle != NULL);
305         rc = llog_cat_add_rec(cathandle, rec, logcookies, NULL);
306         if (rc != 1)
307                 CERROR("write one catalog record failed: %d\n", rc);
308         RETURN(rc);
309 }
310 EXPORT_SYMBOL(llog_obd_origin_add);
311
312 int llog_cat_initialize(struct obd_device *obd, int count)
313 {
314         struct llog_catid *idarray;
315         int size = sizeof(*idarray) * count;
316         char name[32] = CATLIST;
317         int rc;
318         ENTRY;
319
320         OBD_ALLOC(idarray, size);
321         if (!idarray)
322                 RETURN(-ENOMEM);
323
324         rc = llog_get_cat_list(obd, obd, name, count, idarray);
325         if (rc) {
326                 CERROR("rc: %d\n", rc);
327                 GOTO(out, rc);
328         }
329
330         rc = obd_llog_init(obd, obd, count, idarray);
331         if (rc) {
332                 CERROR("rc: %d\n", rc);
333                 GOTO(out, rc);
334         }
335
336         rc = llog_put_cat_list(obd, obd, name, count, idarray);
337         if (rc) {
338                 CERROR("rc: %d\n", rc);
339                 GOTO(out, rc);
340         }
341
342  out:
343         OBD_FREE(idarray, size);
344         RETURN(rc);
345 }
346 EXPORT_SYMBOL(llog_cat_initialize);
347
348 int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
349                   int count, struct llog_catid *logid)
350 {
351         int rc;
352         ENTRY;
353         OBD_CHECK_DT_OP(obd, llog_init, 0);
354         OBD_COUNTER_INCREMENT(obd, llog_init);
355
356         rc = OBP(obd, llog_init)(obd, disk_obd, count, logid);
357         RETURN(rc);
358 }
359 EXPORT_SYMBOL(obd_llog_init);
360
361 int obd_llog_finish(struct obd_device *obd, int count)
362 {
363         int rc;
364         ENTRY;
365         OBD_CHECK_DT_OP(obd, llog_finish, 0);
366         OBD_COUNTER_INCREMENT(obd, llog_finish);
367
368         rc = OBP(obd, llog_finish)(obd, count);
369         RETURN(rc);
370 }
371 EXPORT_SYMBOL(obd_llog_finish);