Whamcloud - gitweb
LU-6635 lfsck: block replacing the OST-object for test
[fs/lustre-release.git] / lustre / fld / fld_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/fld/fld_request.c
33  *
34  * FLD (Fids Location Database)
35  *
36  * Author: Yury Umanets <umka@clusterfs.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_FLD
40
41 #include <libcfs/libcfs.h>
42 #include <linux/module.h>
43 #include <linux/math64.h>
44
45 #include <obd.h>
46 #include <obd_class.h>
47 #include <obd_support.h>
48 #include <lprocfs_status.h>
49 #include <lustre_req_layout.h>
50 #include <lustre_fld.h>
51 #include <lustre_mdc.h>
52 #include "fld_internal.h"
53
54 static int fld_rrb_hash(struct lu_client_fld *fld, u64 seq)
55 {
56         LASSERT(fld->lcf_count > 0);
57         return do_div(seq, fld->lcf_count);
58 }
59
60 static struct lu_fld_target *
61 fld_rrb_scan(struct lu_client_fld *fld, u64 seq)
62 {
63         struct lu_fld_target *target;
64         int hash;
65         ENTRY;
66
67         /* Because almost all of special sequence located in MDT0,
68          * it should go to index 0 directly, instead of calculating
69          * hash again, and also if other MDTs is not being connected,
70          * the fld lookup requests(for seq on MDT0) should not be
71          * blocked because of other MDTs */
72         if (fid_seq_is_norm(seq))
73                 hash = fld_rrb_hash(fld, seq);
74         else
75                 hash = 0;
76
77 again:
78         list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
79                 if (target->ft_idx == hash)
80                         RETURN(target);
81         }
82
83         if (hash != 0) {
84                 /* It is possible the remote target(MDT) are not connected to
85                  * with client yet, so we will refer this to MDT0, which should
86                  * be connected during mount */
87                 hash = 0;
88                 goto again;
89         }
90
91         CERROR("%s: Can't find target by hash %d (seq %#llx). "
92                "Targets (%d):\n", fld->lcf_name, hash, seq,
93                fld->lcf_count);
94
95         list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
96                 const char *srv_name = target->ft_srv != NULL  ?
97                         target->ft_srv->lsf_name : "<null>";
98                 const char *exp_name = target->ft_exp != NULL ?
99                         (char *)target->ft_exp->exp_obd->obd_uuid.uuid :
100                         "<null>";
101
102                 CERROR("  exp: 0x%p (%s), srv: 0x%p (%s), idx: %llu\n",
103                        target->ft_exp, exp_name, target->ft_srv,
104                        srv_name, target->ft_idx);
105         }
106
107         /*
108          * If target is not found, there is logical error anyway, so here is
109          * LBUG() to catch this situation.
110          */
111         LBUG();
112         RETURN(NULL);
113 }
114
115 struct lu_fld_hash fld_hash[] = {
116         {
117                 .fh_name = "RRB",
118                 .fh_hash_func = fld_rrb_hash,
119                 .fh_scan_func = fld_rrb_scan
120         },
121         {
122                 NULL,
123         }
124 };
125
126 static struct lu_fld_target *
127 fld_client_get_target(struct lu_client_fld *fld, u64 seq)
128 {
129         struct lu_fld_target *target;
130         ENTRY;
131
132         LASSERT(fld->lcf_hash != NULL);
133
134         spin_lock(&fld->lcf_lock);
135         target = fld->lcf_hash->fh_scan_func(fld, seq);
136         spin_unlock(&fld->lcf_lock);
137
138         if (target != NULL) {
139                 CDEBUG(D_INFO, "%s: Found target (idx %llu"
140                        ") by seq %#llx\n", fld->lcf_name,
141                        target->ft_idx, seq);
142         }
143
144         RETURN(target);
145 }
146
147 /*
148  * Add export to FLD. This is usually done by CMM and LMV as they are main users
149  * of FLD module.
150  */
151 int fld_client_add_target(struct lu_client_fld *fld,
152                           struct lu_fld_target *tar)
153 {
154         const char *name;
155         struct lu_fld_target *target, *tmp;
156         ENTRY;
157
158         LASSERT(tar != NULL);
159         name = fld_target_name(tar);
160         LASSERT(name != NULL);
161         LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
162
163         CDEBUG(D_INFO, "%s: Adding target %s (idx %llu)\n", fld->lcf_name,
164                name, tar->ft_idx);
165
166         OBD_ALLOC_PTR(target);
167         if (target == NULL)
168                 RETURN(-ENOMEM);
169
170         spin_lock(&fld->lcf_lock);
171         list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
172                 if (tmp->ft_idx == tar->ft_idx) {
173                         spin_unlock(&fld->lcf_lock);
174                         OBD_FREE_PTR(target);
175                         CERROR("Target %s exists in FLD and known as %s:#%llu\n",
176                                name, fld_target_name(tmp), tmp->ft_idx);
177                         RETURN(-EEXIST);
178                 }
179         }
180
181         target->ft_exp = tar->ft_exp;
182         if (target->ft_exp != NULL)
183                 class_export_get(target->ft_exp);
184         target->ft_srv = tar->ft_srv;
185         target->ft_idx = tar->ft_idx;
186
187         list_add_tail(&target->ft_chain, &fld->lcf_targets);
188
189         fld->lcf_count++;
190         spin_unlock(&fld->lcf_lock);
191
192         RETURN(0);
193 }
194 EXPORT_SYMBOL(fld_client_add_target);
195
196 /* Remove export from FLD */
197 int fld_client_del_target(struct lu_client_fld *fld, __u64 idx)
198 {
199         struct lu_fld_target *target, *tmp;
200         ENTRY;
201
202         spin_lock(&fld->lcf_lock);
203         list_for_each_entry_safe(target, tmp, &fld->lcf_targets, ft_chain) {
204                 if (target->ft_idx == idx) {
205                         fld->lcf_count--;
206                         list_del(&target->ft_chain);
207                         spin_unlock(&fld->lcf_lock);
208
209                         if (target->ft_exp != NULL)
210                                 class_export_put(target->ft_exp);
211
212                         OBD_FREE_PTR(target);
213                         RETURN(0);
214                 }
215         }
216         spin_unlock(&fld->lcf_lock);
217         RETURN(-ENOENT);
218 }
219
220 #ifdef CONFIG_PROC_FS
221 static int fld_client_proc_init(struct lu_client_fld *fld)
222 {
223         int rc;
224         ENTRY;
225
226         fld->lcf_proc_dir = lprocfs_register(fld->lcf_name, fld_type_proc_dir,
227                                              NULL, NULL);
228         if (IS_ERR(fld->lcf_proc_dir)) {
229                 CERROR("%s: LProcFS failed in fld-init\n",
230                        fld->lcf_name);
231                 rc = PTR_ERR(fld->lcf_proc_dir);
232                 RETURN(rc);
233         }
234
235         rc = lprocfs_add_vars(fld->lcf_proc_dir, fld_client_proc_list, fld);
236         if (rc) {
237                 CERROR("%s: Can't init FLD proc, rc %d\n",
238                        fld->lcf_name, rc);
239                 GOTO(out_cleanup, rc);
240         }
241
242         RETURN(0);
243
244 out_cleanup:
245         fld_client_proc_fini(fld);
246         return rc;
247 }
248
249 void fld_client_proc_fini(struct lu_client_fld *fld)
250 {
251         ENTRY;
252         if (fld->lcf_proc_dir) {
253                 if (!IS_ERR(fld->lcf_proc_dir))
254                         lprocfs_remove(&fld->lcf_proc_dir);
255                 fld->lcf_proc_dir = NULL;
256         }
257         EXIT;
258 }
259 #else /* !CONFIG_PROC_FS */
260 static int fld_client_proc_init(struct lu_client_fld *fld)
261 {
262         return 0;
263 }
264
265 void fld_client_proc_fini(struct lu_client_fld *fld)
266 {
267         return;
268 }
269 #endif /* CONFIG_PROC_FS */
270
271 EXPORT_SYMBOL(fld_client_proc_fini);
272
273 static inline int hash_is_sane(int hash)
274 {
275         return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
276 }
277
278 int fld_client_init(struct lu_client_fld *fld,
279                     const char *prefix, int hash)
280 {
281         int cache_size, cache_threshold;
282         int rc;
283         ENTRY;
284
285         LASSERT(fld != NULL);
286
287         snprintf(fld->lcf_name, sizeof(fld->lcf_name),
288                  "cli-%s", prefix);
289
290         if (!hash_is_sane(hash)) {
291                 CERROR("%s: Wrong hash function %#x\n",
292                        fld->lcf_name, hash);
293                 RETURN(-EINVAL);
294         }
295
296         fld->lcf_count = 0;
297         spin_lock_init(&fld->lcf_lock);
298         fld->lcf_hash = &fld_hash[hash];
299         INIT_LIST_HEAD(&fld->lcf_targets);
300
301         cache_size = FLD_CLIENT_CACHE_SIZE /
302                 sizeof(struct fld_cache_entry);
303
304         cache_threshold = cache_size *
305                 FLD_CLIENT_CACHE_THRESHOLD / 100;
306
307         fld->lcf_cache = fld_cache_init(fld->lcf_name,
308                                         cache_size, cache_threshold);
309         if (IS_ERR(fld->lcf_cache)) {
310                 rc = PTR_ERR(fld->lcf_cache);
311                 fld->lcf_cache = NULL;
312                 GOTO(out, rc);
313         }
314
315         rc = fld_client_proc_init(fld);
316         if (rc)
317                 GOTO(out, rc);
318         EXIT;
319 out:
320         if (rc)
321                 fld_client_fini(fld);
322         else
323                 CDEBUG(D_INFO, "%s: Using \"%s\" hash\n",
324                        fld->lcf_name, fld->lcf_hash->fh_name);
325         return rc;
326 }
327 EXPORT_SYMBOL(fld_client_init);
328
329 void fld_client_fini(struct lu_client_fld *fld)
330 {
331         struct lu_fld_target *target, *tmp;
332         ENTRY;
333
334         spin_lock(&fld->lcf_lock);
335         list_for_each_entry_safe(target, tmp, &fld->lcf_targets, ft_chain) {
336                 fld->lcf_count--;
337                 list_del(&target->ft_chain);
338                 if (target->ft_exp != NULL)
339                         class_export_put(target->ft_exp);
340                 OBD_FREE_PTR(target);
341         }
342         spin_unlock(&fld->lcf_lock);
343
344         if (fld->lcf_cache != NULL) {
345                 if (!IS_ERR(fld->lcf_cache))
346                         fld_cache_fini(fld->lcf_cache);
347                 fld->lcf_cache = NULL;
348         }
349
350         EXIT;
351 }
352 EXPORT_SYMBOL(fld_client_fini);
353
354 int fld_client_rpc(struct obd_export *exp,
355                    struct lu_seq_range *range, __u32 fld_op,
356                    struct ptlrpc_request **reqp)
357 {
358         struct ptlrpc_request *req = NULL;
359         struct lu_seq_range   *prange;
360         __u32                 *op;
361         int                    rc = 0;
362         struct obd_import     *imp;
363         ENTRY;
364
365         LASSERT(exp != NULL);
366
367 again:
368         imp = class_exp2cliimp(exp);
369         switch (fld_op) {
370         case FLD_QUERY:
371                 req = ptlrpc_request_alloc_pack(imp, &RQF_FLD_QUERY,
372                                                 LUSTRE_MDS_VERSION, FLD_QUERY);
373                 if (req == NULL)
374                         RETURN(-ENOMEM);
375
376                 /* XXX: only needed when talking to old server(< 2.6), it should
377                  * be removed when < 2.6 server is not supported */
378                 op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC);
379                 *op = FLD_LOOKUP;
380
381                 /* For MDS_MDS seq lookup, it will always use LWP connection,
382                  * but LWP will be evicted after restart, so cause the error.
383                  * so we will set no_delay for seq lookup request, once the
384                  * request fails because of the eviction. always retry here */
385                 if (imp->imp_connect_flags_orig & OBD_CONNECT_MDS_MDS) {
386                         req->rq_allow_replay = 1;
387                         req->rq_no_delay = 1;
388                 }
389                 break;
390         case FLD_READ:
391                 req = ptlrpc_request_alloc_pack(imp, &RQF_FLD_READ,
392                                                 LUSTRE_MDS_VERSION, FLD_READ);
393                 if (req == NULL)
394                         RETURN(-ENOMEM);
395
396                 req_capsule_set_size(&req->rq_pill, &RMF_GENERIC_DATA,
397                                      RCL_SERVER, PAGE_SIZE);
398                 break;
399         default:
400                 rc = -EINVAL;
401                 break;
402         }
403
404         if (rc != 0)
405                 RETURN(rc);
406
407         prange = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD);
408         *prange = *range;
409         ptlrpc_request_set_replen(req);
410         req->rq_request_portal = FLD_REQUEST_PORTAL;
411         req->rq_reply_portal = MDC_REPLY_PORTAL;
412         ptlrpc_at_set_req_timeout(req);
413
414         obd_get_request_slot(&exp->exp_obd->u.cli);
415         rc = ptlrpc_queue_wait(req);
416         obd_put_request_slot(&exp->exp_obd->u.cli);
417
418         if (rc == -ENOENT) {
419                 /* Don't loop forever on non-existing FID sequences. */
420                 GOTO(out_req, rc);
421         }
422
423         if (rc != 0) {
424                 if (imp->imp_state != LUSTRE_IMP_CLOSED &&
425                     !imp->imp_deactive &&
426                     imp->imp_connect_flags_orig & OBD_CONNECT_MDS_MDS &&
427                     rc != -ENOTSUPP) {
428                         /* Since LWP is not replayable, so it will keep
429                          * trying unless umount happens or the remote
430                          * target does not support the operation, otherwise
431                          * it would cause unecessary failure of the
432                          * application. */
433                         ptlrpc_req_finished(req);
434                         rc = 0;
435                         goto again;
436                 }
437                 GOTO(out_req, rc);
438         }
439
440         if (fld_op == FLD_QUERY) {
441                 prange = req_capsule_server_get(&req->rq_pill,
442                                                 &RMF_FLD_MDFLD);
443                 if (prange == NULL)
444                         GOTO(out_req, rc = -EFAULT);
445                 *range = *prange;
446         }
447
448         EXIT;
449 out_req:
450         if (rc != 0 || reqp == NULL) {
451                 ptlrpc_req_finished(req);
452                 req = NULL;
453         }
454
455         if (reqp != NULL)
456                 *reqp = req;
457
458         return rc;
459 }
460
461 int fld_client_lookup(struct lu_client_fld *fld, u64 seq, u32 *mds,
462                       __u32 flags, const struct lu_env *env)
463 {
464         struct lu_seq_range res = { 0 };
465         struct lu_fld_target *target;
466         struct lu_fld_target *origin;
467         int rc;
468         ENTRY;
469
470         rc = fld_cache_lookup(fld->lcf_cache, seq, &res);
471         if (rc == 0) {
472                 *mds = res.lsr_index;
473                 RETURN(0);
474         }
475
476         /* Can not find it in the cache */
477         target = fld_client_get_target(fld, seq);
478         LASSERT(target != NULL);
479         origin = target;
480 again:
481         CDEBUG(D_INFO, "%s: Lookup fld entry (seq: %#llx) on "
482                "target %s (idx %llu)\n", fld->lcf_name, seq,
483                fld_target_name(target), target->ft_idx);
484
485         res.lsr_start = seq;
486         fld_range_set_type(&res, flags);
487
488 #ifdef HAVE_SERVER_SUPPORT
489         if (target->ft_srv != NULL) {
490                 LASSERT(env != NULL);
491                 rc = fld_server_lookup(env, target->ft_srv, seq, &res);
492         } else
493 #endif /* HAVE_SERVER_SUPPORT */
494         {
495                 rc = fld_client_rpc(target->ft_exp, &res, FLD_QUERY, NULL);
496         }
497
498         if (rc == -ESHUTDOWN) {
499                 /* If fld lookup failed because the target has been shutdown,
500                  * then try next target in the list, until trying all targets
501                  * or fld lookup succeeds */
502                 spin_lock(&fld->lcf_lock);
503
504                 /* If the next entry in the list is the head of the list,
505                  * move to the next entry after the head and retrieve
506                  * the target. Else retreive the next target entry. */
507
508                 if (target->ft_chain.next == &fld->lcf_targets)
509                         target = list_entry(target->ft_chain.next->next,
510                                             struct lu_fld_target, ft_chain);
511                 else
512                         target = list_entry(target->ft_chain.next,
513                                                  struct lu_fld_target,
514                                                  ft_chain);
515                 spin_unlock(&fld->lcf_lock);
516                 if (target != origin)
517                         goto again;
518         }
519         if (rc == 0) {
520                 *mds = res.lsr_index;
521                 fld_cache_insert(fld->lcf_cache, &res);
522         }
523
524         RETURN(rc);
525 }
526 EXPORT_SYMBOL(fld_client_lookup);
527
528 void fld_client_flush(struct lu_client_fld *fld)
529 {
530         fld_cache_flush(fld->lcf_cache);
531 }
532
533
534 struct proc_dir_entry *fld_type_proc_dir;
535
536 static int __init fld_init(void)
537 {
538         fld_type_proc_dir = lprocfs_register(LUSTRE_FLD_NAME,
539                                              proc_lustre_root,
540                                              NULL, NULL);
541         if (IS_ERR(fld_type_proc_dir))
542                 return PTR_ERR(fld_type_proc_dir);
543
544 #ifdef HAVE_SERVER_SUPPORT
545         fld_server_mod_init();
546 #endif /* HAVE_SERVER_SUPPORT */
547
548         return 0;
549 }
550
551 static void __exit fld_exit(void)
552 {
553 #ifdef HAVE_SERVER_SUPPORT
554         fld_server_mod_exit();
555 #endif /* HAVE_SERVER_SUPPORT */
556
557         if (fld_type_proc_dir != NULL && !IS_ERR(fld_type_proc_dir)) {
558                 lprocfs_remove(&fld_type_proc_dir);
559                 fld_type_proc_dir = NULL;
560         }
561 }
562
563 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
564 MODULE_DESCRIPTION("Lustre FID Location Database");
565 MODULE_VERSION(LUSTRE_VERSION_STRING);
566 MODULE_LICENSE("GPL");
567
568 module_init(fld_init);
569 module_exit(fld_exit);