Whamcloud - gitweb
- recovery support in seq-mgr.
[fs/lustre-release.git] / lustre / fid / fid_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_request.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
49
50 static int seq_client_rpc(struct lu_client_seq *seq,
51                           struct lu_range *range,
52                           __u32 opc, const char *opcname)
53 {
54         int rc, size[3] = { sizeof(struct ptlrpc_body),
55                             sizeof(__u32),
56                             sizeof(struct lu_range) };
57         struct obd_export *exp = seq->lcs_exp;
58         struct ptlrpc_request *req;
59         struct lu_range *out, *in;
60         struct req_capsule pill;
61         __u32 *op;
62         ENTRY;
63
64         req = ptlrpc_prep_req(class_exp2cliimp(exp),
65                               LUSTRE_MDS_VERSION,
66                               SEQ_QUERY, 3, size,
67                               NULL);
68         if (req == NULL)
69                 RETURN(-ENOMEM);
70
71         req_capsule_init(&pill, req, RCL_CLIENT, NULL);
72         req_capsule_set(&pill, &RQF_SEQ_QUERY);
73
74         /* init operation code */
75         op = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
76         *op = opc;
77
78         /* zero out input range, this is not recovery yet. */
79         in = req_capsule_client_get(&pill, &RMF_SEQ_RANGE);
80         range_zero(in);
81         
82         size[1] = sizeof(struct lu_range);
83         ptlrpc_req_set_repsize(req, 2, size);
84
85         if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
86                 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
87                         SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
88         } else {
89                 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
90                         SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL;
91         }
92
93         rc = ptlrpc_queue_wait(req);
94         if (rc)
95                 GOTO(out_req, rc);
96
97         out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
98         *range = *out;
99
100         if (!range_is_sane(range)) {
101                 CERROR("invalid seq range obtained from server: "
102                        DRANGE"\n", PRANGE(range));
103                 GOTO(out_req, rc = -EINVAL);
104         }
105
106         if (range_is_exhausted(range)) {
107                 CERROR("seq range obtained from server is exhausted: "
108                        DRANGE"]\n", PRANGE(range));
109                 GOTO(out_req, rc = -EINVAL);
110         }
111
112         /* Save server out to request for recovery case. */
113         *in = *out;
114                 
115         CDEBUG(D_INFO, "%s: allocated %s-sequence "
116                DRANGE"]\n", seq->lcs_name, opcname,
117                PRANGE(range));
118         
119         EXIT;
120 out_req:
121         req_capsule_fini(&pill);
122         ptlrpc_req_finished(req);
123         return rc;
124 }
125
126 /* request sequence-controller node to allocate new super-sequence. */
127 static int __seq_client_alloc_super(struct lu_client_seq *seq)
128 {
129         int rc;
130         
131 #ifdef __KERNEL__
132         if (seq->lcs_srv) {
133                 rc = seq_server_alloc_super(seq->lcs_srv, NULL,
134                                             &seq->lcs_range,
135                                             seq->lcs_ctx);
136         } else {
137 #endif
138                 rc = seq_client_rpc(seq, &seq->lcs_range,
139                                     SEQ_ALLOC_SUPER, "super");
140 #ifdef __KERNEL__
141         }
142 #endif
143         return rc;
144 }
145
146 int seq_client_alloc_super(struct lu_client_seq *seq)
147 {
148         int rc;
149         ENTRY;
150
151         down(&seq->lcs_sem);
152         rc = __seq_client_alloc_super(seq);
153         up(&seq->lcs_sem);
154
155         RETURN(rc);
156 }
157 EXPORT_SYMBOL(seq_client_alloc_super);
158
159 /* request sequence-controller node to allocate new meta-sequence. */
160 static int __seq_client_alloc_meta(struct lu_client_seq *seq)
161 {
162         int rc;
163
164 #ifdef __KERNEL__
165         if (seq->lcs_srv) {
166                 rc = seq_server_alloc_meta(seq->lcs_srv, NULL,
167                                            &seq->lcs_range,
168                                            seq->lcs_ctx);
169         } else {
170 #endif
171                 rc = seq_client_rpc(seq, &seq->lcs_range,
172                                     SEQ_ALLOC_META, "meta");
173 #ifdef __KERNEL__
174         }
175 #endif
176         return rc;
177 }
178
179 int seq_client_alloc_meta(struct lu_client_seq *seq)
180 {
181         int rc;
182         ENTRY;
183
184         down(&seq->lcs_sem);
185         rc = __seq_client_alloc_meta(seq);
186         up(&seq->lcs_sem);
187
188         RETURN(rc);
189 }
190 EXPORT_SYMBOL(seq_client_alloc_meta);
191
192 /* allocate new sequence for client (llite or MDC are expected to use this) */
193 static int __seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
194 {
195         int rc = 0;
196         ENTRY;
197
198         LASSERT(range_is_sane(&seq->lcs_range));
199
200         /* if we still have free sequences in meta-sequence we allocate new seq
201          * from given range, if not - allocate new meta-sequence. */
202         if (range_space(&seq->lcs_range) == 0) {
203                 rc = __seq_client_alloc_meta(seq);
204                 if (rc) {
205                         CERROR("can't allocate new meta-sequence, "
206                                "rc %d\n", rc);
207                         RETURN(rc);
208                 }
209         }
210
211         LASSERT(range_space(&seq->lcs_range) > 0);
212         *seqnr = seq->lcs_range.lr_start;
213         seq->lcs_range.lr_start++;
214
215         CDEBUG(D_INFO, "%s: allocated sequence ["LPX64"]\n",
216                seq->lcs_name, *seqnr);
217         RETURN(rc);
218 }
219
220 int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
221 {
222         int rc = 0;
223         ENTRY;
224
225         down(&seq->lcs_sem);
226         rc = __seq_client_alloc_seq(seq, seqnr);
227         up(&seq->lcs_sem);
228
229         RETURN(rc);
230 }
231 EXPORT_SYMBOL(seq_client_alloc_seq);
232
233 int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
234 {
235         int rc;
236         ENTRY;
237
238         LASSERT(fid != NULL);
239
240         down(&seq->lcs_sem);
241
242         if (!fid_is_sane(&seq->lcs_fid) ||
243             fid_oid(&seq->lcs_fid) >= seq->lcs_width)
244         {
245                 seqno_t seqnr;
246                 
247                 /* allocate new sequence for case client has no sequence at all
248                  * or sequence is exhausted and should be switched. */
249                 rc = __seq_client_alloc_seq(seq, &seqnr);
250                 if (rc) {
251                         CERROR("can't allocate new sequence, "
252                                "rc %d\n", rc);
253                         GOTO(out, rc);
254                 }
255
256                 /* init new fid */
257                 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
258                 seq->lcs_fid.f_seq = seqnr;
259                 seq->lcs_fid.f_ver = 0;
260
261                 /* inform caller that sequence switch is performed to allow it
262                  * to setup FLD for it. */
263                 rc = 1;
264         } else {
265                 seq->lcs_fid.f_oid++;
266                 rc = 0;
267         }
268
269         *fid = seq->lcs_fid;
270         LASSERT(fid_is_sane(fid));
271
272         CDEBUG(D_INFO, "%s: allocated FID "DFID"\n",
273                seq->lcs_name, PFID(fid));
274
275         EXIT;
276 out:
277         up(&seq->lcs_sem);
278         return rc;
279 }
280 EXPORT_SYMBOL(seq_client_alloc_fid);
281
282 static void seq_client_proc_fini(struct lu_client_seq *seq);
283
284 #ifdef LPROCFS
285 static int seq_client_proc_init(struct lu_client_seq *seq)
286 {
287         int rc;
288         ENTRY;
289
290         seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
291                                              proc_lustre_root,
292                                              NULL, NULL);
293
294         if (IS_ERR(seq->lcs_proc_dir)) {
295                 CERROR("LProcFS failed in seq-init\n");
296                 rc = PTR_ERR(seq->lcs_proc_dir);
297                 RETURN(rc);
298         }
299
300         rc = lprocfs_add_vars(seq->lcs_proc_dir,
301                               seq_client_proc_list, seq);
302         if (rc) {
303                 CERROR("can't init sequence manager "
304                        "proc, rc %d\n", rc);
305                 GOTO(out_cleanup, rc);
306         }
307
308         RETURN(0);
309
310 out_cleanup:
311         seq_client_proc_fini(seq);
312         return rc;
313 }
314
315 static void seq_client_proc_fini(struct lu_client_seq *seq)
316 {
317         ENTRY;
318         if (seq->lcs_proc_dir) {
319                 if (!IS_ERR(seq->lcs_proc_dir))
320                         lprocfs_remove(seq->lcs_proc_dir);
321                 seq->lcs_proc_dir = NULL;
322         }
323         EXIT;
324 }
325 #else
326 static int seq_client_proc_init(struct lu_client_seq *seq)
327 {
328         return 0;
329 }
330
331 static void seq_client_proc_fini(struct lu_client_seq *seq)
332 {
333         return;
334 }
335 #endif
336
337 int seq_client_init(struct lu_client_seq *seq,
338                     struct obd_export *exp,
339                     enum lu_cli_type type,
340                     const char *prefix,
341                     struct lu_server_seq *srv,
342                     const struct lu_context *ctx)
343 {
344         int rc;
345         ENTRY;
346
347         LASSERT(seq != NULL);
348         LASSERT(prefix != NULL);
349
350         seq->lcs_ctx = ctx;
351         seq->lcs_exp = exp;
352         seq->lcs_srv = srv;
353         seq->lcs_type = type;
354         fid_zero(&seq->lcs_fid);
355         range_zero(&seq->lcs_range);
356         sema_init(&seq->lcs_sem, 1);
357         seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
358
359         if (exp == NULL) {
360                 LASSERT(seq->lcs_ctx != NULL);
361                 LASSERT(seq->lcs_srv != NULL);
362         } else {
363                 LASSERT(seq->lcs_exp != NULL);
364                 seq->lcs_exp = class_export_get(seq->lcs_exp);
365         }
366
367         snprintf(seq->lcs_name, sizeof(seq->lcs_name),
368                  "%s-cli-%s", LUSTRE_SEQ_NAME, prefix);
369
370         rc = seq_client_proc_init(seq);
371         if (rc)
372                 seq_client_fini(seq);
373         else
374                 CDEBUG(D_INFO|D_WARNING,
375                        "Client Sequence Manager\n");
376         RETURN(rc);
377 }
378 EXPORT_SYMBOL(seq_client_init);
379
380 void seq_client_fini(struct lu_client_seq *seq)
381 {
382         ENTRY;
383
384         seq_client_proc_fini(seq);
385
386         if (seq->lcs_exp != NULL) {
387                 class_export_put(seq->lcs_exp);
388                 seq->lcs_exp = NULL;
389         }
390
391         seq->lcs_srv = NULL;
392
393         CDEBUG(D_INFO|D_WARNING,
394                "Client Sequence Manager\n");
395
396         EXIT;
397 }
398 EXPORT_SYMBOL(seq_client_fini);