Whamcloud - gitweb
9c9abb735384cf9a1c864a39f16589b979bc1c08
[fs/lustre-release.git] / lustre / obdclass / llog_obd.c
1
2 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3  * vim:expandtab:shiftwidth=8:tabstop=8:
4  *
5  *   You should have received a copy of the GNU General Public License
6  *   along with Lustre; if not, write to the Free Software
7  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
8  */
9
10 #define DEBUG_SUBSYSTEM S_LOG
11
12 #ifndef EXPORT_SYMTAB
13 #define EXPORT_SYMTAB
14 #endif
15
16 #ifdef __KERNEL__
17 #include <linux/fs.h>
18 #else
19 #include <liblustre.h>
20 #endif
21
22 #include <linux/obd_class.h>
23 #include <linux/lustre_log.h>
24 #include <portals/list.h>
25 #include "llog_internal.h"
26
27 /* helper functions for calling the llog obd methods */
28
29 int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd, 
30                int count, struct llog_logid *logid, struct llog_operations *op)
31 {
32         int rc = 0;
33         struct llog_ctxt *ctxt;
34         ENTRY;
35
36         if (index < 0 || index >= LLOG_MAX_CTXTS)
37                 RETURN(-EFAULT);
38
39         OBD_ALLOC(ctxt, sizeof(*ctxt));
40         if (!ctxt)
41                 RETURN(-ENOMEM);
42
43         obd->obd_llog_ctxt[index] = ctxt;
44         ctxt->loc_obd = obd;
45         ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
46         ctxt->loc_idx = index;
47         ctxt->loc_logops = op;
48         sema_init(&ctxt->loc_sem, 1);
49
50         if (op->lop_setup)
51                 rc = op->lop_setup(obd, index, disk_obd, count, logid);
52         if (ctxt && rc) 
53                 OBD_FREE(ctxt, sizeof(*ctxt));
54
55         RETURN(rc);
56 }
57 EXPORT_SYMBOL(llog_setup);
58
59 int llog_cleanup(struct llog_ctxt *ctxt)
60 {
61         int rc = 0;
62         ENTRY;
63
64         down(&ctxt->loc_sem);
65         LASSERT(ctxt);
66
67         if (CTXTP(ctxt, cleanup))
68                 rc = CTXTP(ctxt, cleanup)(ctxt);
69
70         ctxt->loc_obd->obd_llog_ctxt[ctxt->loc_idx] = NULL;
71         class_export_put(ctxt->loc_exp);
72         ctxt->loc_exp = NULL;
73         up(&ctxt->loc_sem);
74         OBD_FREE(ctxt, sizeof(*ctxt));
75
76         RETURN(rc);
77 }
78 EXPORT_SYMBOL(llog_cleanup);
79
80 int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
81 {
82         int rc = 0;
83         ENTRY;
84
85         if (!ctxt)
86                 RETURN(0);
87         down(&ctxt->loc_sem);
88         if (ctxt->loc_llcd && CTXTP(ctxt, sync))
89                 rc = CTXTP(ctxt, sync)(ctxt, exp);
90         else
91                 up(&ctxt->loc_sem);
92
93         RETURN(rc);
94 }
95 EXPORT_SYMBOL(llog_sync);
96
97 int llog_add(struct llog_ctxt *ctxt,
98                  struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
99                  struct llog_cookie *logcookies, int numcookies)
100 {
101         int rc;
102         ENTRY;
103
104         LASSERT(ctxt);
105         down(&ctxt->loc_sem);
106         CTXT_CHECK_OP(ctxt, add, -EOPNOTSUPP);
107
108         rc = CTXTP(ctxt, add)(ctxt, rec, lsm, logcookies, numcookies);
109         up(&ctxt->loc_sem);
110         RETURN(rc);
111 }
112 EXPORT_SYMBOL(llog_add);
113
114 int llog_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
115                 int count, struct llog_cookie *cookies, int flags)
116 {
117         int rc;
118         ENTRY;
119
120         LASSERT(ctxt);
121         CTXT_CHECK_OP(ctxt, cancel, -EOPNOTSUPP);
122         rc = CTXTP(ctxt, cancel)(ctxt, lsm, count, cookies, flags);
123         RETURN(rc);
124 }
125 EXPORT_SYMBOL(llog_cancel);
126
127 /* callback func for llog_process in llog_obd_origin_setup */
128 static int cat_cancel_cb(struct llog_handle *cathandle, 
129                           struct llog_rec_hdr *rec, void *data)
130 {
131         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
132         struct llog_handle *loghandle;
133         struct llog_log_hdr *llh;
134         int rc, index;
135         ENTRY;
136
137         if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
138                 CERROR("invalid record in catalog\n");
139                 RETURN(-EINVAL);
140         }
141         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n", 
142                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
143                le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid);
144
145         rc = llog_cat_id2handle(cathandle, &loghandle, &lir->lid_id);
146         if (rc) {
147                 CERROR("Cannot find handle for log "LPX64"\n", lir->lid_id.lgl_oid);
148                 RETURN(rc);
149         }        
150         
151         llh = loghandle->lgh_hdr;
152         if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
153             (le32_to_cpu(llh->llh_count) == 1)) {
154                 rc = llog_destroy(loghandle);
155                 if (rc)
156                         CERROR("failure destroying log during postsetup: %d\n", rc);
157                 LASSERT(rc == 0);
158
159                 index = loghandle->u.phd.phd_cookie.lgc_index;
160                 if (cathandle->u.chd.chd_current_log == loghandle)
161                         cathandle->u.chd.chd_current_log = NULL;
162                 llog_free_handle(loghandle);
163                                                                                                                 
164                 LASSERT(index);
165                 rc = llog_cancel_rec(cathandle, index);
166                 if (rc == 0)
167                         CWARN("cancel log "LPX64":%x at index %u of catalog "LPX64"\n", 
168                                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
169                                le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid);
170         }
171
172         RETURN(rc);
173 }
174
175 /* lop_setup method for filter/osc */
176 // XXX how to set exports
177 int llog_obd_origin_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
178                           int count, struct llog_logid *logid)
179 {
180         struct llog_ctxt *ctxt;
181         struct llog_handle *handle;
182         struct obd_run_ctxt saved;
183         int rc;
184         ENTRY;
185
186         if (count == 0)
187                 RETURN(0);
188
189         LASSERT(count == 1);
190         
191         ctxt = llog_get_context(obd, index);
192         LASSERT(ctxt);
193         log_gen_init(ctxt);
194
195         down(&ctxt->loc_sem);
196         if (logid->lgl_oid)
197                 rc = llog_create(ctxt, &handle, logid, NULL);
198         else {
199                 rc = llog_create(ctxt, &handle, NULL, NULL);
200                 if (!rc) 
201                         *logid = handle->lgh_id;
202         }
203         if (rc) 
204                 GOTO(out, rc);
205
206         ctxt->loc_handle = handle;
207         push_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
208         rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL);
209         pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
210         if (rc)
211                 GOTO(out, rc);
212
213         rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL);
214         if (rc) 
215                 CERROR("llog_process with cat_cancel_cb failed: %d\n", rc);
216  out:
217         up(&ctxt->loc_sem);
218         if (ctxt && rc) {
219                 obd->obd_llog_ctxt[index] = NULL;
220                 OBD_FREE(ctxt, sizeof(*ctxt));
221         }
222         RETURN(rc);
223 }
224 EXPORT_SYMBOL(llog_obd_origin_setup);
225
226 int llog_obd_origin_cleanup(struct llog_ctxt *ctxt)
227 {
228         struct llog_handle *cathandle, *n, *loghandle;
229         struct llog_log_hdr *llh;
230         int rc, index;
231         ENTRY;
232         
233         if (!ctxt)
234                 return 0;
235
236         cathandle = ctxt->loc_handle;
237         if (cathandle) {
238                 list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head, 
239                                  u.phd.phd_entry) {
240                         llh = loghandle->lgh_hdr;
241                         if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
242                             (le32_to_cpu(llh->llh_count) == 1)) {
243                                 rc = llog_destroy(loghandle);
244                                 if (rc)
245                                         CERROR("failure destroying log during cleanup: %d\n",
246                                                rc);
247                                 LASSERT(rc == 0);
248
249                                 index = loghandle->u.phd.phd_cookie.lgc_index;
250                                 if (cathandle->u.chd.chd_current_log == loghandle)
251                                         cathandle->u.chd.chd_current_log = NULL;
252                                 llog_free_handle(loghandle);
253                                                                                                                              
254                                 LASSERT(index);
255                                 rc = llog_cancel_rec(cathandle, index);
256                                 if (rc == 0)
257                                         CWARN("cancel plain log at index %u of catalog "LPX64"\n", 
258                                               index, cathandle->lgh_id.lgl_oid);
259                         }
260                 }
261                 llog_cat_put(ctxt->loc_handle);
262         }
263         return 0;
264 }
265 EXPORT_SYMBOL(llog_obd_origin_cleanup);
266
267
268 /* add for obdfilter/sz and mds/unlink */
269 int llog_obd_origin_add(struct llog_ctxt *ctxt,
270                         struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
271                         struct llog_cookie *logcookies, int numcookies)
272 {
273         struct llog_handle *cathandle;
274         int rc;
275         ENTRY;
276
277         cathandle = ctxt->loc_handle;
278         LASSERT(cathandle != NULL);
279         rc = llog_cat_add_rec(cathandle, rec, logcookies, NULL);
280         if (rc != 1)
281                 CERROR("write one catalog record failed: %d\n", rc);
282         RETURN(rc);
283 }
284 EXPORT_SYMBOL(llog_obd_origin_add);
285
286 int llog_cat_initialize(struct obd_device *obd, int count)
287 {
288         struct llog_logid *idarray;
289         int size = sizeof(*idarray) * count;
290         char name[32] = "CATLIST";
291         int rc;
292         ENTRY;
293
294         OBD_ALLOC(idarray, size);
295         if (!idarray)
296                 RETURN(-ENOMEM);
297
298         memset(idarray, 0, size);
299
300         rc = llog_get_cat_list(obd, obd, name, count, idarray);
301         if (rc) {
302                 CERROR("rc: %d\n", rc);
303                 GOTO(out, rc);
304         }
305
306         rc = obd_llog_init(obd, obd, count, idarray);
307         if (rc) {
308                 CERROR("rc: %d\n", rc);
309                 GOTO(out, rc);
310         }
311
312         rc = llog_put_cat_list(obd, obd, name, count, idarray);
313         if (rc) {
314                 CERROR("rc: %d\n", rc);
315                 GOTO(out, rc);
316         }
317                         
318  out:
319         OBD_FREE(idarray, size);
320         RETURN(rc);
321 }
322 EXPORT_SYMBOL(llog_cat_initialize);
323
324 int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
325                   int count, struct llog_logid *logid)
326 {
327         int rc;
328         ENTRY;
329         OBD_CHECK_OP(obd, llog_init, 0);
330         OBD_COUNTER_INCREMENT(obd, llog_init);
331
332         rc = OBP(obd, llog_init)(obd, disk_obd, count, logid);
333         RETURN(rc);
334 }
335 EXPORT_SYMBOL(obd_llog_init);
336
337 int obd_llog_finish(struct obd_device *obd, int count)
338 {
339         int rc;
340         ENTRY;
341         OBD_CHECK_OP(obd, llog_finish, 0);
342         OBD_COUNTER_INCREMENT(obd, llog_finish);
343
344         rc = OBP(obd, llog_finish)(obd, count);
345         RETURN(rc);
346 }
347 EXPORT_SYMBOL(obd_llog_finish);