Whamcloud - gitweb
LU-1866 lfsck: enhance otable-based iteration
[fs/lustre-release.git] / lustre / obdclass / llog_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LOG
38
39 #ifndef __KERNEL__
40 #include <liblustre.h>
41 #endif
42
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include "llog_internal.h"
46
47 /* helper functions for calling the llog obd methods */
48 static struct llog_ctxt* llog_new_ctxt(struct obd_device *obd)
49 {
50         struct llog_ctxt *ctxt;
51
52         OBD_ALLOC_PTR(ctxt);
53         if (!ctxt)
54                 return NULL;
55
56         ctxt->loc_obd = obd;
57         cfs_atomic_set(&ctxt->loc_refcount, 1);
58
59         return ctxt;
60 }
61
62 static void llog_ctxt_destroy(struct llog_ctxt *ctxt)
63 {
64         if (ctxt->loc_exp) {
65                 class_export_put(ctxt->loc_exp);
66                 ctxt->loc_exp = NULL;
67         }
68         if (ctxt->loc_imp) {
69                 class_import_put(ctxt->loc_imp);
70                 ctxt->loc_imp = NULL;
71         }
72         LASSERT(ctxt->loc_llcd == NULL);
73         OBD_FREE_PTR(ctxt);
74 }
75
76 int __llog_ctxt_put(const struct lu_env *env, struct llog_ctxt *ctxt)
77 {
78         struct obd_llog_group *olg = ctxt->loc_olg;
79         struct obd_device *obd;
80         int rc = 0;
81
82         spin_lock(&olg->olg_lock);
83         if (!cfs_atomic_dec_and_test(&ctxt->loc_refcount)) {
84                 spin_unlock(&olg->olg_lock);
85                 return rc;
86         }
87         olg->olg_ctxts[ctxt->loc_idx] = NULL;
88         spin_unlock(&olg->olg_lock);
89
90         if (ctxt->loc_lcm)
91                 lcm_put(ctxt->loc_lcm);
92
93         obd = ctxt->loc_obd;
94         spin_lock(&obd->obd_dev_lock);
95         /* sync with llog ctxt user thread */
96         spin_unlock(&obd->obd_dev_lock);
97
98         /* obd->obd_starting is needed for the case of cleanup
99          * in error case while obd is starting up. */
100         LASSERTF(obd->obd_starting == 1 ||
101                  obd->obd_stopping == 1 || obd->obd_set_up == 0,
102                  "wrong obd state: %d/%d/%d\n", !!obd->obd_starting,
103                  !!obd->obd_stopping, !!obd->obd_set_up);
104
105         /* cleanup the llog ctxt here */
106         if (CTXTP(ctxt, cleanup))
107                 rc = CTXTP(ctxt, cleanup)(env, ctxt);
108
109         llog_ctxt_destroy(ctxt);
110         cfs_waitq_signal(&olg->olg_waitq);
111         return rc;
112 }
113 EXPORT_SYMBOL(__llog_ctxt_put);
114
115 int llog_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt)
116 {
117         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
118         struct obd_llog_group *olg;
119         int rc, idx;
120         ENTRY;
121
122         LASSERT(ctxt != NULL);
123         LASSERT(ctxt != LP_POISON);
124
125         olg = ctxt->loc_olg;
126         LASSERT(olg != NULL);
127         LASSERT(olg != LP_POISON);
128
129         idx = ctxt->loc_idx;
130
131         /*
132          * Banlance the ctxt get when calling llog_cleanup()
133          */
134         LASSERT(cfs_atomic_read(&ctxt->loc_refcount) < LI_POISON);
135         LASSERT(cfs_atomic_read(&ctxt->loc_refcount) > 1);
136         llog_ctxt_put(ctxt);
137
138         /*
139          * Try to free the ctxt.
140          */
141         rc = __llog_ctxt_put(env, ctxt);
142         if (rc)
143                 CERROR("Error %d while cleaning up ctxt %p\n",
144                        rc, ctxt);
145
146         l_wait_event(olg->olg_waitq,
147                      llog_group_ctxt_null(olg, idx), &lwi);
148
149         RETURN(rc);
150 }
151 EXPORT_SYMBOL(llog_cleanup);
152
153 int llog_setup(const struct lu_env *env, struct obd_device *obd,
154                struct obd_llog_group *olg, int index,
155                struct obd_device *disk_obd, struct llog_operations *op)
156 {
157         struct llog_ctxt *ctxt;
158         int rc = 0;
159         ENTRY;
160
161         if (index < 0 || index >= LLOG_MAX_CTXTS)
162                 RETURN(-EINVAL);
163
164         LASSERT(olg != NULL);
165
166         ctxt = llog_new_ctxt(obd);
167         if (!ctxt)
168                 RETURN(-ENOMEM);
169
170         ctxt->loc_obd = obd;
171         ctxt->loc_olg = olg;
172         ctxt->loc_idx = index;
173         ctxt->loc_logops = op;
174         mutex_init(&ctxt->loc_mutex);
175         ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
176         ctxt->loc_flags = LLOG_CTXT_FLAG_UNINITIALIZED;
177
178         rc = llog_group_set_ctxt(olg, ctxt, index);
179         if (rc) {
180                 llog_ctxt_destroy(ctxt);
181                 if (rc == -EEXIST) {
182                         ctxt = llog_group_get_ctxt(olg, index);
183                         if (ctxt) {
184                                 /*
185                                  * mds_lov_update_desc() might call here multiple
186                                  * times. So if the llog is already set up then
187                                  * don't to do it again. 
188                                  */
189                                 CDEBUG(D_CONFIG, "obd %s ctxt %d already set up\n",
190                                        obd->obd_name, index);
191                                 LASSERT(ctxt->loc_olg == olg);
192                                 LASSERT(ctxt->loc_obd == obd);
193                                 LASSERT(ctxt->loc_exp == disk_obd->obd_self_export);
194                                 LASSERT(ctxt->loc_logops == op);
195                                 llog_ctxt_put(ctxt);
196                         }
197                         rc = 0;
198                 }
199                 RETURN(rc);
200         }
201
202         if (op->lop_setup) {
203                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LLOG_SETUP))
204                         rc = -EOPNOTSUPP;
205                 else
206                         rc = op->lop_setup(env, obd, olg, index, disk_obd);
207         }
208
209         if (rc) {
210                 CERROR("%s: ctxt %d lop_setup=%p failed: rc = %d\n",
211                        obd->obd_name, index, op->lop_setup, rc);
212                 llog_group_clear_ctxt(olg, index);
213                 llog_ctxt_destroy(ctxt);
214         } else {
215                 CDEBUG(D_CONFIG, "obd %s ctxt %d is initialized\n",
216                        obd->obd_name, index);
217                 ctxt->loc_flags &= ~LLOG_CTXT_FLAG_UNINITIALIZED;
218         }
219
220         RETURN(rc);
221 }
222 EXPORT_SYMBOL(llog_setup);
223
224 int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp, int flags)
225 {
226         int rc = 0;
227         ENTRY;
228
229         if (!ctxt)
230                 RETURN(0);
231
232         if (CTXTP(ctxt, sync))
233                 rc = CTXTP(ctxt, sync)(ctxt, exp, flags);
234
235         RETURN(rc);
236 }
237 EXPORT_SYMBOL(llog_sync);
238
239 int llog_obd_add(const struct lu_env *env, struct llog_ctxt *ctxt,
240                  struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
241                  struct llog_cookie *logcookies, int numcookies)
242 {
243         int raised, rc;
244         ENTRY;
245
246         if (!ctxt) {
247                 CERROR("No ctxt\n");
248                 RETURN(-ENODEV);
249         }
250
251         if (ctxt->loc_flags & LLOG_CTXT_FLAG_UNINITIALIZED)
252                 RETURN(-ENXIO);
253
254         CTXT_CHECK_OP(ctxt, obd_add, -EOPNOTSUPP);
255         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
256         if (!raised)
257                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
258         rc = CTXTP(ctxt, obd_add)(env, ctxt, rec, lsm, logcookies,
259                                   numcookies);
260         if (!raised)
261                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
262         RETURN(rc);
263 }
264 EXPORT_SYMBOL(llog_obd_add);
265
266 int llog_cancel(const struct lu_env *env, struct llog_ctxt *ctxt,
267                 struct lov_stripe_md *lsm, int count,
268                 struct llog_cookie *cookies, int flags)
269 {
270         int rc;
271         ENTRY;
272
273         if (!ctxt) {
274                 CERROR("No ctxt\n");
275                 RETURN(-ENODEV);
276         }
277
278         CTXT_CHECK_OP(ctxt, cancel, -EOPNOTSUPP);
279         rc = CTXTP(ctxt, cancel)(env, ctxt, lsm, count, cookies, flags);
280         RETURN(rc);
281 }
282 EXPORT_SYMBOL(llog_cancel);
283
284 /* add for obdfilter/sz and mds/unlink */
285 int llog_obd_origin_add(const struct lu_env *env, struct llog_ctxt *ctxt,
286                         struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
287                         struct llog_cookie *logcookies, int numcookies)
288 {
289         struct llog_handle *cathandle;
290         int rc;
291         ENTRY;
292
293         cathandle = ctxt->loc_handle;
294         LASSERT(cathandle != NULL);
295         rc = llog_cat_add(env, cathandle, rec, logcookies, NULL);
296         if (rc != 0 && rc != 1)
297                 CERROR("write one catalog record failed: %d\n", rc);
298         RETURN(rc);
299 }
300 EXPORT_SYMBOL(llog_obd_origin_add);
301
302 int obd_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
303                   struct obd_device *disk_obd, int *index)
304 {
305         int rc;
306         ENTRY;
307         OBD_CHECK_DT_OP(obd, llog_init, 0);
308         OBD_COUNTER_INCREMENT(obd, llog_init);
309
310         rc = OBP(obd, llog_init)(obd, olg, disk_obd, index);
311         RETURN(rc);
312 }
313 EXPORT_SYMBOL(obd_llog_init);
314
315 int obd_llog_finish(struct obd_device *obd, int count)
316 {
317         int rc;
318         ENTRY;
319         OBD_CHECK_DT_OP(obd, llog_finish, 0);
320         OBD_COUNTER_INCREMENT(obd, llog_finish);
321
322         rc = OBP(obd, llog_finish)(obd, count);
323         RETURN(rc);
324 }
325 EXPORT_SYMBOL(obd_llog_finish);
326
327 /* context key constructor/destructor: llog_key_init, llog_key_fini */
328 LU_KEY_INIT_FINI(llog, struct llog_thread_info);
329 /* context key: llog_thread_key */
330 LU_CONTEXT_KEY_DEFINE(llog, LCT_MD_THREAD | LCT_MG_THREAD | LCT_LOCAL);
331 LU_KEY_INIT_GENERIC(llog);
332 EXPORT_SYMBOL(llog_thread_key);
333
334 int llog_info_init(void)
335 {
336         llog_key_init_generic(&llog_thread_key, NULL);
337         lu_context_key_register(&llog_thread_key);
338         return 0;
339 }
340
341 void llog_info_fini(void)
342 {
343         lu_context_key_degister(&llog_thread_key);
344 }