Whamcloud - gitweb
LU-1378 fid: Add console info for super seq allocation
[fs/lustre-release.git] / lustre / fid / fid_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/fid/fid_request.c
37  *
38  * Lustre Sequence Manager
39  *
40  * Author: Yury Umanets <umka@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_FID
44
45 #ifdef __KERNEL__
46 # include <libcfs/libcfs.h>
47 # include <linux/module.h>
48 #else /* __KERNEL__ */
49 # include <liblustre.h>
50 #endif
51
52 #include <obd.h>
53 #include <obd_class.h>
54 #include <dt_object.h>
55 #include <md_object.h>
56 #include <obd_support.h>
57 #include <lustre_req_layout.h>
58 #include <lustre_fid.h>
59 /* mdc RPC locks */
60 #include <lustre_mdc.h>
61 #include "fid_internal.h"
62
63 static int seq_client_rpc(struct lu_client_seq *seq,
64                           struct lu_seq_range *output, __u32 opc,
65                           const char *opcname)
66 {
67         struct obd_export     *exp = seq->lcs_exp;
68         struct ptlrpc_request *req;
69         struct lu_seq_range   *out, *in;
70         __u32                 *op;
71         unsigned int           debug_mask;
72         int                    rc;
73         ENTRY;
74
75         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY,
76                                         LUSTRE_MDS_VERSION, SEQ_QUERY);
77         if (req == NULL)
78                 RETURN(-ENOMEM);
79
80         /* Init operation code */
81         op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC);
82         *op = opc;
83
84         /* Zero out input range, this is not recovery yet. */
85         in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE);
86         range_init(in);
87
88         ptlrpc_request_set_replen(req);
89
90         if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
91                 req->rq_request_portal = SEQ_METADATA_PORTAL;
92                 in->lsr_flags = LU_SEQ_RANGE_MDT;
93         } else {
94                 LASSERTF(seq->lcs_type == LUSTRE_SEQ_DATA,
95                          "unknown lcs_type %u\n", seq->lcs_type);
96                 req->rq_request_portal = SEQ_DATA_PORTAL;
97                 in->lsr_flags = LU_SEQ_RANGE_OST;
98         }
99
100         if (opc == SEQ_ALLOC_SUPER) {
101                 /* Update index field of *in, it is required for
102                  * FLD update on super sequence allocator node. */
103                 in->lsr_index = seq->lcs_space.lsr_index;
104                 req->rq_request_portal = SEQ_CONTROLLER_PORTAL;
105                 debug_mask = D_CONSOLE;
106         } else {
107                 debug_mask = D_INFO;
108                 LASSERTF(opc == SEQ_ALLOC_META,
109                          "unknown opcode %u\n, opc", opc);
110         }
111
112         ptlrpc_at_set_req_timeout(req);
113
114         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
115         rc = ptlrpc_queue_wait(req);
116         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
117
118         if (rc)
119                 GOTO(out_req, rc);
120
121         out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE);
122         *output = *out;
123
124         if (!range_is_sane(output)) {
125                 CERROR("%s: Invalid range received from server: "
126                        DRANGE"\n", seq->lcs_name, PRANGE(output));
127                 GOTO(out_req, rc = -EINVAL);
128         }
129
130         if (range_is_exhausted(output)) {
131                 CERROR("%s: Range received from server is exhausted: "
132                        DRANGE"]\n", seq->lcs_name, PRANGE(output));
133                 GOTO(out_req, rc = -EINVAL);
134         }
135
136         CDEBUG_LIMIT(debug_mask, "%s: Allocated %s-sequence "DRANGE"]\n",
137                      seq->lcs_name, opcname, PRANGE(output));
138
139         EXIT;
140 out_req:
141         ptlrpc_req_finished(req);
142         return rc;
143 }
144
145 /* Request sequence-controller node to allocate new super-sequence. */
146 int seq_client_alloc_super(struct lu_client_seq *seq,
147                            const struct lu_env *env)
148 {
149         int rc;
150         ENTRY;
151
152         cfs_mutex_lock(&seq->lcs_mutex);
153
154 #ifdef __KERNEL__
155         if (seq->lcs_srv) {
156                 LASSERT(env != NULL);
157                 rc = seq_server_alloc_super(seq->lcs_srv, &seq->lcs_space,
158                                             env);
159         } else {
160 #endif
161                 rc = seq_client_rpc(seq, &seq->lcs_space,
162                                     SEQ_ALLOC_SUPER, "super");
163 #ifdef __KERNEL__
164         }
165 #endif
166         cfs_mutex_unlock(&seq->lcs_mutex);
167         RETURN(rc);
168 }
169
170 /* Request sequence-controller node to allocate new meta-sequence. */
171 static int seq_client_alloc_meta(const struct lu_env *env,
172                                  struct lu_client_seq *seq)
173 {
174         int rc;
175         ENTRY;
176
177 #ifdef __KERNEL__
178         if (seq->lcs_srv) {
179                 LASSERT(env != NULL);
180                 rc = seq_server_alloc_meta(seq->lcs_srv, &seq->lcs_space, env);
181         } else {
182 #endif
183                 rc = seq_client_rpc(seq, &seq->lcs_space,
184                                     SEQ_ALLOC_META, "meta");
185 #ifdef __KERNEL__
186         }
187 #endif
188         RETURN(rc);
189 }
190
191 /* Allocate new sequence for client. */
192 static int seq_client_alloc_seq(const struct lu_env *env,
193                                 struct lu_client_seq *seq, seqno_t *seqnr)
194 {
195         int rc;
196         ENTRY;
197
198         LASSERT(range_is_sane(&seq->lcs_space));
199
200         if (range_is_exhausted(&seq->lcs_space)) {
201                 rc = seq_client_alloc_meta(env, seq);
202                 if (rc) {
203                         CERROR("%s: Can't allocate new meta-sequence,"
204                                "rc %d\n", seq->lcs_name, rc);
205                         RETURN(rc);
206                 } else {
207                         CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
208                                seq->lcs_name, PRANGE(&seq->lcs_space));
209                 }
210         } else {
211                 rc = 0;
212         }
213
214         LASSERT(!range_is_exhausted(&seq->lcs_space));
215         *seqnr = seq->lcs_space.lsr_start;
216         seq->lcs_space.lsr_start += 1;
217
218         CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
219                *seqnr);
220
221         RETURN(rc);
222 }
223
224 static int seq_fid_alloc_prep(struct lu_client_seq *seq,
225                               cfs_waitlink_t *link)
226 {
227         if (seq->lcs_update) {
228                 cfs_waitq_add(&seq->lcs_waitq, link);
229                 cfs_set_current_state(CFS_TASK_UNINT);
230                 cfs_mutex_unlock(&seq->lcs_mutex);
231
232                 cfs_waitq_wait(link, CFS_TASK_UNINT);
233
234                 cfs_mutex_lock(&seq->lcs_mutex);
235                 cfs_waitq_del(&seq->lcs_waitq, link);
236                 cfs_set_current_state(CFS_TASK_RUNNING);
237                 return -EAGAIN;
238         }
239         ++seq->lcs_update;
240         cfs_mutex_unlock(&seq->lcs_mutex);
241         return 0;
242 }
243
244 static void seq_fid_alloc_fini(struct lu_client_seq *seq)
245 {
246         LASSERT(seq->lcs_update == 1);
247         cfs_mutex_lock(&seq->lcs_mutex);
248         --seq->lcs_update;
249         cfs_waitq_signal(&seq->lcs_waitq);
250 }
251
252 /* Allocate the whole seq to the caller*/
253 int seq_client_get_seq(const struct lu_env *env,
254                        struct lu_client_seq *seq, seqno_t *seqnr)
255 {
256         cfs_waitlink_t link;
257         int rc;
258
259         LASSERT(seqnr != NULL);
260         cfs_mutex_lock(&seq->lcs_mutex);
261         cfs_waitlink_init(&link);
262
263         while (1) {
264                 rc = seq_fid_alloc_prep(seq, &link);
265                 if (rc == 0)
266                         break;
267         }
268
269         rc = seq_client_alloc_seq(env, seq, seqnr);
270         if (rc) {
271                 CERROR("%s: Can't allocate new sequence, "
272                        "rc %d\n", seq->lcs_name, rc);
273                 seq_fid_alloc_fini(seq);
274                 cfs_mutex_unlock(&seq->lcs_mutex);
275                 return rc;
276         }
277
278         CDEBUG(D_INFO, "%s: allocate sequence "
279                "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr);
280
281         /*Since the caller require the whole seq,
282          *so marked this seq to be used*/
283         seq->lcs_fid.f_oid = LUSTRE_SEQ_MAX_WIDTH;
284         seq->lcs_fid.f_seq = *seqnr;
285         seq->lcs_fid.f_ver = 0;
286
287         /*
288          * Inform caller that sequence switch is performed to allow it
289          * to setup FLD for it.
290          */
291         seq_fid_alloc_fini(seq);
292         cfs_mutex_unlock(&seq->lcs_mutex);
293
294         return rc;
295 }
296 EXPORT_SYMBOL(seq_client_get_seq);
297
298 /* Allocate new fid on passed client @seq and save it to @fid. */
299 int seq_client_alloc_fid(const struct lu_env *env,
300                          struct lu_client_seq *seq, struct lu_fid *fid)
301 {
302         cfs_waitlink_t link;
303         int rc;
304         ENTRY;
305
306         LASSERT(seq != NULL);
307         LASSERT(fid != NULL);
308
309         cfs_waitlink_init(&link);
310         cfs_mutex_lock(&seq->lcs_mutex);
311
312         while (1) {
313                 seqno_t seqnr;
314
315                 if (!fid_is_zero(&seq->lcs_fid) &&
316                     fid_oid(&seq->lcs_fid) < seq->lcs_width) {
317                         /* Just bump last allocated fid and return to caller. */
318                         seq->lcs_fid.f_oid += 1;
319                         rc = 0;
320                         break;
321                 }
322
323                 rc = seq_fid_alloc_prep(seq, &link);
324                 if (rc)
325                         continue;
326
327                 rc = seq_client_alloc_seq(env, seq, &seqnr);
328                 if (rc) {
329                         CERROR("%s: Can't allocate new sequence, "
330                                "rc %d\n", seq->lcs_name, rc);
331                         seq_fid_alloc_fini(seq);
332                         cfs_mutex_unlock(&seq->lcs_mutex);
333                         RETURN(rc);
334                 }
335
336                 CDEBUG(D_INFO, "%s: Switch to sequence "
337                        "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
338
339                 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
340                 seq->lcs_fid.f_seq = seqnr;
341                 seq->lcs_fid.f_ver = 0;
342
343                 /*
344                  * Inform caller that sequence switch is performed to allow it
345                  * to setup FLD for it.
346                  */
347                 rc = 1;
348
349                 seq_fid_alloc_fini(seq);
350                 break;
351         }
352
353         *fid = seq->lcs_fid;
354         cfs_mutex_unlock(&seq->lcs_mutex);
355
356         CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
357         RETURN(rc);
358 }
359 EXPORT_SYMBOL(seq_client_alloc_fid);
360
361 /*
362  * Finish the current sequence due to disconnect.
363  * See mdc_import_event()
364  */
365 void seq_client_flush(struct lu_client_seq *seq)
366 {
367         cfs_waitlink_t link;
368
369         LASSERT(seq != NULL);
370         cfs_waitlink_init(&link);
371         cfs_mutex_lock(&seq->lcs_mutex);
372
373         while (seq->lcs_update) {
374                 cfs_waitq_add(&seq->lcs_waitq, &link);
375                 cfs_set_current_state(CFS_TASK_UNINT);
376                 cfs_mutex_unlock(&seq->lcs_mutex);
377
378                 cfs_waitq_wait(&link, CFS_TASK_UNINT);
379
380                 cfs_mutex_lock(&seq->lcs_mutex);
381                 cfs_waitq_del(&seq->lcs_waitq, &link);
382                 cfs_set_current_state(CFS_TASK_RUNNING);
383         }
384
385         fid_zero(&seq->lcs_fid);
386         /**
387          * this id shld not be used for seq range allocation.
388          * set to -1 for dgb check.
389          */
390
391         seq->lcs_space.lsr_index = -1;
392
393         range_init(&seq->lcs_space);
394         cfs_mutex_unlock(&seq->lcs_mutex);
395 }
396 EXPORT_SYMBOL(seq_client_flush);
397
398 static void seq_client_proc_fini(struct lu_client_seq *seq);
399
400 #ifdef LPROCFS
401 static int seq_client_proc_init(struct lu_client_seq *seq)
402 {
403         int rc;
404         ENTRY;
405
406         seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
407                                              seq_type_proc_dir,
408                                              NULL, NULL);
409
410         if (IS_ERR(seq->lcs_proc_dir)) {
411                 CERROR("%s: LProcFS failed in seq-init\n",
412                        seq->lcs_name);
413                 rc = PTR_ERR(seq->lcs_proc_dir);
414                 RETURN(rc);
415         }
416
417         rc = lprocfs_add_vars(seq->lcs_proc_dir,
418                               seq_client_proc_list, seq);
419         if (rc) {
420                 CERROR("%s: Can't init sequence manager "
421                        "proc, rc %d\n", seq->lcs_name, rc);
422                 GOTO(out_cleanup, rc);
423         }
424
425         RETURN(0);
426
427 out_cleanup:
428         seq_client_proc_fini(seq);
429         return rc;
430 }
431
432 static void seq_client_proc_fini(struct lu_client_seq *seq)
433 {
434         ENTRY;
435         if (seq->lcs_proc_dir) {
436                 if (!IS_ERR(seq->lcs_proc_dir))
437                         lprocfs_remove(&seq->lcs_proc_dir);
438                 seq->lcs_proc_dir = NULL;
439         }
440         EXIT;
441 }
442 #else
443 static int seq_client_proc_init(struct lu_client_seq *seq)
444 {
445         return 0;
446 }
447
448 static void seq_client_proc_fini(struct lu_client_seq *seq)
449 {
450         return;
451 }
452 #endif
453
454 int seq_client_init(struct lu_client_seq *seq,
455                     struct obd_export *exp,
456                     enum lu_cli_type type,
457                     const char *prefix,
458                     struct lu_server_seq *srv)
459 {
460         int rc;
461         ENTRY;
462
463         LASSERT(seq != NULL);
464         LASSERT(prefix != NULL);
465
466         seq->lcs_exp = exp;
467         seq->lcs_srv = srv;
468         seq->lcs_type = type;
469         cfs_mutex_init(&seq->lcs_mutex);
470         seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
471         cfs_waitq_init(&seq->lcs_waitq);
472
473         /* Make sure that things are clear before work is started. */
474         seq_client_flush(seq);
475
476         if (exp == NULL) {
477                 LASSERT(seq->lcs_srv != NULL);
478         } else {
479                 LASSERT(seq->lcs_exp != NULL);
480                 seq->lcs_exp = class_export_get(seq->lcs_exp);
481         }
482
483         snprintf(seq->lcs_name, sizeof(seq->lcs_name),
484                  "cli-%s", prefix);
485
486         rc = seq_client_proc_init(seq);
487         if (rc)
488                 seq_client_fini(seq);
489         RETURN(rc);
490 }
491 EXPORT_SYMBOL(seq_client_init);
492
493 void seq_client_fini(struct lu_client_seq *seq)
494 {
495         ENTRY;
496
497         seq_client_proc_fini(seq);
498
499         if (seq->lcs_exp != NULL) {
500                 class_export_put(seq->lcs_exp);
501                 seq->lcs_exp = NULL;
502         }
503
504         seq->lcs_srv = NULL;
505         EXIT;
506 }
507 EXPORT_SYMBOL(seq_client_fini);