Whamcloud - gitweb
LU-909 osd: changes to osd api
[fs/lustre-release.git] / lustre / fid / fid_store.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  * Copyright (c) 2011 Whamcloud, Inc.
32  */
33 /*
34  * This file is part of Lustre, http://www.lustre.org/
35  * Lustre is a trademark of Sun Microsystems, Inc.
36  *
37  * lustre/fid/fid_store.c
38  *
39  * Lustre Sequence Manager
40  *
41  * Author: Yury Umanets <umka@clusterfs.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_FID
48
49 #ifdef __KERNEL__
50 # include <libcfs/libcfs.h>
51 # include <linux/module.h>
52 #else /* __KERNEL__ */
53 # include <liblustre.h>
54 #endif
55
56 #include <obd.h>
57 #include <obd_class.h>
58 #include <dt_object.h>
59 #include <md_object.h>
60 #include <obd_support.h>
61 #include <lustre_req_layout.h>
62 #include <lustre_fid.h>
63 #include "fid_internal.h"
64
65 #ifdef __KERNEL__
66
67 static struct lu_buf *seq_store_buf(struct seq_thread_info *info)
68 {
69         struct lu_buf *buf;
70
71         buf = &info->sti_buf;
72         buf->lb_buf = &info->sti_space;
73         buf->lb_len = sizeof(info->sti_space);
74         return buf;
75 }
76
77 struct seq_update_callback {
78         struct dt_txn_commit_cb suc_cb;
79         struct lu_server_seq   *suc_seq;
80 };
81
82 void seq_update_cb(struct lu_env *env, struct thandle *th,
83                    struct dt_txn_commit_cb *cb, int err)
84 {
85         struct seq_update_callback *ccb;
86         ccb = container_of0(cb, struct seq_update_callback, suc_cb);
87         ccb->suc_seq->lss_need_sync = 0;
88         cfs_list_del(&ccb->suc_cb.dcb_linkage);
89         OBD_FREE_PTR(ccb);
90 }
91
92 struct thandle *seq_store_trans_create(struct lu_server_seq *seq,
93                                        const struct lu_env *env)
94 {
95         struct dt_device *dt_dev;
96
97         dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
98         return dt_trans_create(env, dt_dev);
99 }
100
101 int seq_store_trans_start(struct lu_server_seq *seq, const struct lu_env *env,
102                           struct thandle *th)
103 {
104         struct dt_device *dt_dev;
105         ENTRY;
106
107         dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
108
109         return dt_trans_start(env, dt_dev, th);
110 }
111
112 int seq_update_cb_add(struct thandle *th, struct lu_server_seq *seq)
113 {
114         struct seq_update_callback *ccb;
115         int rc;
116         OBD_ALLOC_PTR(ccb);
117         if (ccb == NULL)
118                 return -ENOMEM;
119
120         ccb->suc_cb.dcb_func = seq_update_cb;
121         CFS_INIT_LIST_HEAD(&ccb->suc_cb.dcb_linkage);
122         ccb->suc_seq = seq;
123         seq->lss_need_sync = 1;
124         rc = dt_trans_cb_add(th, &ccb->suc_cb);
125         if (rc)
126                 OBD_FREE_PTR(ccb);
127         return rc;
128 }
129
130 int seq_declare_store_write(struct lu_server_seq *seq,
131                             const struct lu_env *env,
132                             struct thandle *th)
133 {
134         struct dt_object *dt_obj = seq->lss_obj;
135         int rc;
136         ENTRY;
137
138         rc = dt_obj->do_body_ops->dbo_declare_write(env, dt_obj,
139                                                     sizeof(struct lu_seq_range),
140                                                     0, th);
141         return rc;
142 }
143
144 /* This function implies that caller takes care about locking. */
145 int seq_store_write(struct lu_server_seq *seq,
146                     const struct lu_env *env,
147                     struct thandle *th)
148 {
149         struct dt_object *dt_obj = seq->lss_obj;
150         struct seq_thread_info *info;
151         loff_t pos = 0;
152         int rc;
153         ENTRY;
154
155         info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
156         LASSERT(info != NULL);
157
158         /* Store ranges in le format. */
159         range_cpu_to_le(&info->sti_space, &seq->lss_space);
160
161         rc = dt_obj->do_body_ops->dbo_write(env, dt_obj,
162                                             seq_store_buf(info),
163                                             &pos, th, BYPASS_CAPA, 1);
164         if (rc == sizeof(info->sti_space)) {
165                 CDEBUG(D_INFO, "%s: Space - "DRANGE"\n",
166                        seq->lss_name, PRANGE(&seq->lss_space));
167                 rc = 0;
168         } else if (rc >= 0) {
169                 rc = -EIO;
170         }
171
172
173         RETURN(rc);
174 }
175
176 int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
177                      struct lu_seq_range *out, int sync)
178 {
179         struct dt_device *dt_dev;
180         struct thandle *th;
181         int rc;
182         ENTRY;
183
184         dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
185
186         th = seq_store_trans_create(seq, env);
187         if (IS_ERR(th))
188                 RETURN(PTR_ERR(th));
189
190         rc = seq_declare_store_write(seq, env, th);
191         if (rc)
192                 GOTO(exit, rc);
193
194         if (out != NULL) {
195                 rc = fld_declare_server_create(seq->lss_site->ms_server_fld,
196                                                env, th);
197                 if (rc)
198                         GOTO(exit, rc);
199         }
200
201         rc = seq_store_trans_start(seq, env, th);
202         if (rc)
203                 GOTO(exit, rc);
204
205         rc = seq_store_write(seq, env, th);
206         if (rc) {
207                 CERROR("%s: Can't write space data, rc %d\n",
208                        seq->lss_name, rc);
209                 GOTO(exit,rc);
210         } else if (out != NULL) {
211                 rc = fld_server_create(seq->lss_site->ms_server_fld,
212                                        env, out, th);
213                 if (rc) {
214                         CERROR("%s: Can't Update fld database, rc %d\n",
215                                seq->lss_name, rc);
216                         GOTO(exit,rc);
217                 }
218         }
219
220         /* next sequence update will need sync until this update is committed
221          * in case of sync operation this is not needed obviously */
222         if (!sync)
223                 /* if callback can't be added then sync always */
224                 sync = !!seq_update_cb_add(th, seq);
225
226         th->th_sync |= sync;
227 exit:
228         dt_trans_stop(env, dt_dev, th);
229         return rc;
230 }
231
232 /*
233  * This function implies that caller takes care about locking or locking is not
234  * needed (init time).
235  */
236 int seq_store_read(struct lu_server_seq *seq,
237                    const struct lu_env *env)
238 {
239         struct dt_object *dt_obj = seq->lss_obj;
240         struct seq_thread_info *info;
241         loff_t pos = 0;
242         int rc;
243         ENTRY;
244
245         info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
246         LASSERT(info != NULL);
247
248         rc = dt_obj->do_body_ops->dbo_read(env, dt_obj, seq_store_buf(info),
249                                            &pos, BYPASS_CAPA);
250
251         if (rc == sizeof(info->sti_space)) {
252                 range_le_to_cpu(&seq->lss_space, &info->sti_space);
253                 CDEBUG(D_INFO, "%s: Space - "DRANGE"\n",
254                        seq->lss_name, PRANGE(&seq->lss_space));
255                 rc = 0;
256         } else if (rc == 0) {
257                 rc = -ENODATA;
258         } else if (rc >= 0) {
259                 CERROR("%s: Read only %d bytes of %d\n", seq->lss_name,
260                        rc, (int)sizeof(info->sti_space));
261                 rc = -EIO;
262         }
263
264         RETURN(rc);
265 }
266
267 int seq_store_init(struct lu_server_seq *seq,
268                    const struct lu_env *env,
269                    struct dt_device *dt)
270 {
271         struct dt_object *dt_obj;
272         struct lu_fid fid;
273         const char *name;
274         int rc;
275         ENTRY;
276
277         name = seq->lss_type == LUSTRE_SEQ_SERVER ?
278                 LUSTRE_SEQ_SRV_NAME : LUSTRE_SEQ_CTL_NAME;
279
280         dt_obj = dt_store_open(env, dt, "", name, &fid);
281         if (!IS_ERR(dt_obj)) {
282                 seq->lss_obj = dt_obj;
283                 rc = 0;
284         } else {
285                 CERROR("%s: Can't find \"%s\" obj %d\n",
286                        seq->lss_name, name, (int)PTR_ERR(dt_obj));
287                 rc = PTR_ERR(dt_obj);
288         }
289
290         RETURN(rc);
291 }
292
293 void seq_store_fini(struct lu_server_seq *seq,
294                     const struct lu_env *env)
295 {
296         ENTRY;
297
298         if (seq->lss_obj != NULL) {
299                 if (!IS_ERR(seq->lss_obj))
300                         lu_object_put(env, &seq->lss_obj->do_lu);
301                 seq->lss_obj = NULL;
302         }
303
304         EXIT;
305 }
306 #endif