Whamcloud - gitweb
LU-10467 ptlrpc: convert final users of LWI_TIMEOUT_INTERVAL
[fs/lustre-release.git] / lustre / ptlrpc / nodemap_storage.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2015, Trustees of Indiana University
24  *
25  * Copyright (c) 2017, Intel Corporation.
26  *
27  * Author: Joshua Walgenbach <jjw@iu.edu>
28  * Author: Kit Westneat <cwestnea@iu.edu>
29  *
30  * Implements the storage functionality for the nodemap configuration. Functions
31  * in this file prepare, store, and load nodemap configuration data. Targets
32  * using nodemap services should register a configuration file object. Nodemap
33  * configuration changes that need to persist should call the appropriate
34  * storage function for the data being modified.
35  *
36  * There are several index types as defined in enum nodemap_idx_type:
37  *      NODEMAP_CLUSTER_IDX     stores the data found on the lu_nodemap struct,
38  *                              like root squash and config flags, as well as
39  *                              the name.
40  *      NODEMAP_RANGE_IDX       stores NID range information for a nodemap
41  *      NODEMAP_UIDMAP_IDX      stores a fs/client UID mapping pair
42  *      NODEMAP_GIDMAP_IDX      stores a fs/client GID mapping pair
43  *      NODEMAP_GLOBAL_IDX      stores whether or not nodemaps are active
44  */
45
46 #include <libcfs/libcfs.h>
47 #include <linux/err.h>
48 #include <linux/kernel.h>
49 #include <linux/list.h>
50 #include <linux/mutex.h>
51 #include <linux/string.h>
52 #include <linux/types.h>
53 #include <uapi/linux/lnet/lnet-types.h>
54 #include <uapi/linux/lustre/lustre_idl.h>
55 #include <dt_object.h>
56 #include <lu_object.h>
57 #include <lustre_net.h>
58 #include <lustre_nodemap.h>
59 #include <obd_class.h>
60 #include <obd_support.h>
61 #include "nodemap_internal.h"
62
63 /* list of registered nodemap index files, except MGS */
64 static LIST_HEAD(ncf_list_head);
65 static DEFINE_MUTEX(ncf_list_lock);
66
67 /* MGS index is different than others, others are listeners to MGS idx */
68 static struct nm_config_file *nodemap_mgs_ncf;
69
70 /* lu_nodemap flags */
71 enum nm_flag_shifts {
72         NM_FL_ALLOW_ROOT_ACCESS = 0x1,
73         NM_FL_TRUST_CLIENT_IDS = 0x2,
74         NM_FL_DENY_UNKNOWN = 0x4,
75         NM_FL_MAP_UID_ONLY = 0x8,
76         NM_FL_MAP_GID_ONLY = 0x10,
77         NM_FL_ENABLE_AUDIT = 0x20,
78 };
79
80 static void nodemap_cluster_key_init(struct nodemap_key *nk, unsigned int nm_id)
81 {
82         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
83                                                         NODEMAP_CLUSTER_IDX));
84         nk->nk_unused = 0;
85 }
86
87 static void nodemap_cluster_rec_init(union nodemap_rec *nr,
88                                      const struct lu_nodemap *nodemap)
89 {
90         BUILD_BUG_ON(sizeof(nr->ncr.ncr_name) != sizeof(nodemap->nm_name));
91
92         strncpy(nr->ncr.ncr_name, nodemap->nm_name, sizeof(nr->ncr.ncr_name));
93         nr->ncr.ncr_squash_uid = cpu_to_le32(nodemap->nm_squash_uid);
94         nr->ncr.ncr_squash_gid = cpu_to_le32(nodemap->nm_squash_gid);
95         nr->ncr.ncr_flags = cpu_to_le32(
96                 (nodemap->nmf_trust_client_ids ?
97                         NM_FL_TRUST_CLIENT_IDS : 0) |
98                 (nodemap->nmf_allow_root_access ?
99                         NM_FL_ALLOW_ROOT_ACCESS : 0) |
100                 (nodemap->nmf_deny_unknown ?
101                         NM_FL_DENY_UNKNOWN : 0) |
102                 (nodemap->nmf_map_uid_only ?
103                         NM_FL_MAP_UID_ONLY : 0) |
104                 (nodemap->nmf_map_gid_only ?
105                         NM_FL_MAP_GID_ONLY : 0) |
106                 (nodemap->nmf_enable_audit ?
107                         NM_FL_ENABLE_AUDIT : 0));
108 }
109
110 static void nodemap_idmap_key_init(struct nodemap_key *nk, unsigned int nm_id,
111                                    enum nodemap_id_type id_type,
112                                    u32 id_client)
113 {
114         enum nodemap_idx_type idx_type;
115
116         if (id_type == NODEMAP_UID)
117                 idx_type = NODEMAP_UIDMAP_IDX;
118         else
119                 idx_type = NODEMAP_GIDMAP_IDX;
120
121         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id, idx_type));
122         nk->nk_id_client = cpu_to_le32(id_client);
123 }
124
125 static void nodemap_idmap_rec_init(union nodemap_rec *nr, u32 id_fs)
126 {
127         nr->nir.nir_id_fs = cpu_to_le32(id_fs);
128 }
129
130 static void nodemap_range_key_init(struct nodemap_key *nk, unsigned int nm_id,
131                                    unsigned int rn_id)
132 {
133         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
134                                                         NODEMAP_RANGE_IDX));
135         nk->nk_range_id = cpu_to_le32(rn_id);
136 }
137
138 static void nodemap_range_rec_init(union nodemap_rec *nr,
139                                    const lnet_nid_t nid[2])
140 {
141         nr->nrr.nrr_start_nid = cpu_to_le64(nid[0]);
142         nr->nrr.nrr_end_nid = cpu_to_le64(nid[1]);
143 }
144
145 static void nodemap_global_key_init(struct nodemap_key *nk)
146 {
147         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(0, NODEMAP_GLOBAL_IDX));
148         nk->nk_unused = 0;
149 }
150
151 static void nodemap_global_rec_init(union nodemap_rec *nr, bool active)
152 {
153         nr->ngr.ngr_is_active = active;
154 }
155
156 /* should be called with dt_write lock */
157 static void nodemap_inc_version(const struct lu_env *env,
158                                 struct dt_object *nodemap_idx,
159                                 struct thandle *th)
160 {
161         u64 ver = dt_version_get(env, nodemap_idx);
162         dt_version_set(env, nodemap_idx, ver + 1, th);
163 }
164
165 enum ncfc_find_create {
166         NCFC_CREATE_NEW = 1,
167 };
168
169 static struct dt_object *nodemap_cache_find_create(const struct lu_env *env,
170                                                    struct dt_device *dev,
171                                                    struct local_oid_storage *los,
172                                                    enum ncfc_find_create create_new)
173 {
174         struct lu_fid tfid;
175         struct dt_object *root_obj;
176         struct dt_object *nm_obj;
177         int rc = 0;
178
179         rc = dt_root_get(env, dev, &tfid);
180         if (rc < 0)
181                 GOTO(out, nm_obj = ERR_PTR(rc));
182
183         root_obj = dt_locate(env, dev, &tfid);
184         if (unlikely(IS_ERR(root_obj)))
185                 GOTO(out, nm_obj = root_obj);
186
187         rc = dt_lookup_dir(env, root_obj, LUSTRE_NODEMAP_NAME, &tfid);
188         if (rc == -ENOENT) {
189                 if (dev->dd_rdonly)
190                         GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
191         } else if (rc) {
192                 GOTO(out_root, nm_obj = ERR_PTR(rc));
193         } else if (dev->dd_rdonly && create_new == NCFC_CREATE_NEW) {
194                 GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
195         }
196
197 again:
198         /* if loading index fails the first time, create new index */
199         if (create_new == NCFC_CREATE_NEW && rc != -ENOENT) {
200                 CDEBUG(D_INFO, "removing old index, creating new one\n");
201                 rc = local_object_unlink(env, dev, root_obj,
202                                          LUSTRE_NODEMAP_NAME);
203                 if (rc < 0) {
204                         /* XXX not sure the best way to get obd name. */
205                         CERROR("cannot destroy nodemap index: rc = %d\n",
206                                rc);
207                         GOTO(out_root, nm_obj = ERR_PTR(rc));
208                 }
209         }
210
211         nm_obj = local_index_find_or_create(env, los, root_obj,
212                                                 LUSTRE_NODEMAP_NAME,
213                                                 S_IFREG | S_IRUGO | S_IWUSR,
214                                                 &dt_nodemap_features);
215         if (IS_ERR(nm_obj))
216                 GOTO(out_root, nm_obj);
217
218         if (nm_obj->do_index_ops == NULL) {
219                 rc = nm_obj->do_ops->do_index_try(env, nm_obj,
220                                                       &dt_nodemap_features);
221                 /* even if loading from tgt fails, connecting to MGS will
222                  * rewrite the config
223                  */
224                 if (rc < 0) {
225                         dt_object_put(env, nm_obj);
226
227                         if (create_new == NCFC_CREATE_NEW)
228                                 GOTO(out_root, nm_obj = ERR_PTR(rc));
229
230                         CERROR("cannot load nodemap index from disk, creating "
231                                "new index: rc = %d\n", rc);
232                         create_new = NCFC_CREATE_NEW;
233                         goto again;
234                 }
235         }
236
237 out_root:
238         dt_object_put(env, root_obj);
239 out:
240         return nm_obj;
241 }
242
243 static int nodemap_idx_insert(const struct lu_env *env,
244                               struct dt_object *idx,
245                               const struct nodemap_key *nk,
246                               const union nodemap_rec *nr)
247 {
248         struct thandle *th;
249         struct dt_device *dev = lu2dt_dev(idx->do_lu.lo_dev);
250         int rc;
251
252         BUILD_BUG_ON(sizeof(union nodemap_rec) != 32);
253
254         th = dt_trans_create(env, dev);
255
256         if (IS_ERR(th))
257                 GOTO(out, rc = PTR_ERR(th));
258
259         rc = dt_declare_insert(env, idx,
260                                (const struct dt_rec *)nr,
261                                (const struct dt_key *)nk, th);
262         if (rc != 0)
263                 GOTO(out, rc);
264
265         rc = dt_declare_version_set(env, idx, th);
266         if (rc != 0)
267                 GOTO(out, rc);
268
269         rc = dt_trans_start_local(env, dev, th);
270         if (rc != 0)
271                 GOTO(out, rc);
272
273         dt_write_lock(env, idx, 0);
274
275         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
276                        (const struct dt_key *)nk, th);
277
278         nodemap_inc_version(env, idx, th);
279         dt_write_unlock(env, idx);
280 out:
281         dt_trans_stop(env, dev, th);
282
283         return rc;
284 }
285
286 static int nodemap_idx_update(const struct lu_env *env,
287                               struct dt_object *idx,
288                               const struct nodemap_key *nk,
289                               const union nodemap_rec *nr)
290 {
291         struct thandle          *th;
292         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
293         int                      rc = 0;
294
295         th = dt_trans_create(env, dev);
296
297         if (IS_ERR(th))
298                 GOTO(out, rc = PTR_ERR(th));
299
300         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
301         if (rc != 0)
302                 GOTO(out, rc);
303
304         rc = dt_declare_insert(env, idx, (const struct dt_rec *)nr,
305                                (const struct dt_key *)nk, th);
306         if (rc != 0)
307                 GOTO(out, rc);
308
309         rc = dt_declare_version_set(env, idx, th);
310         if (rc != 0)
311                 GOTO(out, rc);
312
313         rc = dt_trans_start_local(env, dev, th);
314         if (rc != 0)
315                 GOTO(out, rc);
316
317         dt_write_lock(env, idx, 0);
318
319         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
320         if (rc != 0)
321                 GOTO(out_lock, rc);
322
323         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
324                        (const struct dt_key *)nk, th);
325         if (rc != 0)
326                 GOTO(out_lock, rc);
327
328         nodemap_inc_version(env, idx, th);
329 out_lock:
330         dt_write_unlock(env, idx);
331 out:
332         dt_trans_stop(env, dev, th);
333
334         return rc;
335 }
336
337 static int nodemap_idx_delete(const struct lu_env *env,
338                               struct dt_object *idx,
339                               const struct nodemap_key *nk,
340                               const union nodemap_rec *unused)
341 {
342         struct thandle          *th;
343         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
344         int                      rc = 0;
345
346         th = dt_trans_create(env, dev);
347
348         if (IS_ERR(th))
349                 GOTO(out, rc = PTR_ERR(th));
350
351         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
352         if (rc != 0)
353                 GOTO(out, rc);
354
355         rc = dt_declare_version_set(env, idx, th);
356         if (rc != 0)
357                 GOTO(out, rc);
358
359         rc = dt_trans_start_local(env, dev, th);
360         if (rc != 0)
361                 GOTO(out, rc);
362
363         dt_write_lock(env, idx, 0);
364
365         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
366
367         nodemap_inc_version(env, idx, th);
368
369         dt_write_unlock(env, idx);
370 out:
371         dt_trans_stop(env, dev, th);
372
373         return rc;
374 }
375
376 enum nm_add_update {
377         NM_ADD = 0,
378         NM_UPDATE = 1,
379 };
380
381 static int nodemap_idx_nodemap_add_update(const struct lu_nodemap *nodemap,
382                                           struct dt_object *idx,
383                                           enum nm_add_update update)
384 {
385         struct nodemap_key nk;
386         union nodemap_rec nr;
387         struct lu_env env;
388         int rc = 0;
389
390         ENTRY;
391
392         rc = lu_env_init(&env, LCT_LOCAL);
393         if (rc)
394                 RETURN(rc);
395
396         nodemap_cluster_key_init(&nk, nodemap->nm_id);
397         nodemap_cluster_rec_init(&nr, nodemap);
398
399         if (update == NM_UPDATE)
400                 rc = nodemap_idx_update(&env, idx, &nk, &nr);
401         else
402                 rc = nodemap_idx_insert(&env, idx, &nk, &nr);
403
404         lu_env_fini(&env);
405
406         RETURN(rc);
407 }
408
409 int nodemap_idx_nodemap_add(const struct lu_nodemap *nodemap)
410 {
411         if (nodemap_mgs_ncf == NULL) {
412                 CERROR("cannot add nodemap config to non-existing MGS.\n");
413                 return -EINVAL;
414         }
415
416         return nodemap_idx_nodemap_add_update(nodemap, nodemap_mgs_ncf->ncf_obj,
417                                               NM_ADD);
418 }
419
420 int nodemap_idx_nodemap_update(const struct lu_nodemap *nodemap)
421 {
422         if (nodemap_mgs_ncf == NULL) {
423                 CERROR("cannot add nodemap config to non-existing MGS.\n");
424                 return -EINVAL;
425         }
426
427         return nodemap_idx_nodemap_add_update(nodemap, nodemap_mgs_ncf->ncf_obj,
428                                               NM_UPDATE);
429 }
430
431 int nodemap_idx_nodemap_del(const struct lu_nodemap *nodemap)
432 {
433         struct rb_root           root;
434         struct lu_idmap         *idmap;
435         struct lu_idmap         *temp;
436         struct lu_nid_range     *range;
437         struct lu_nid_range     *range_temp;
438         struct nodemap_key       nk;
439         struct lu_env            env;
440         int                      rc = 0;
441         int                      rc2 = 0;
442
443         ENTRY;
444
445         if (nodemap_mgs_ncf == NULL) {
446                 CERROR("cannot add nodemap config to non-existing MGS.\n");
447                 return -EINVAL;
448         }
449
450         rc = lu_env_init(&env, LCT_LOCAL);
451         if (rc != 0)
452                 RETURN(rc);
453
454         root = nodemap->nm_fs_to_client_uidmap;
455         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
456                                                 id_fs_to_client) {
457                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
458                                        idmap->id_client);
459                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
460                                          &nk, NULL);
461                 if (rc2 < 0)
462                         rc = rc2;
463         }
464
465         root = nodemap->nm_client_to_fs_gidmap;
466         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
467                                                 id_client_to_fs) {
468                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
469                                        idmap->id_client);
470                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
471                                          &nk, NULL);
472                 if (rc2 < 0)
473                         rc = rc2;
474         }
475
476         list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
477                                  rn_list) {
478                 nodemap_range_key_init(&nk, nodemap->nm_id, range->rn_id);
479                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
480                                          &nk, NULL);
481                 if (rc2 < 0)
482                         rc = rc2;
483         }
484
485         nodemap_cluster_key_init(&nk, nodemap->nm_id);
486         rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
487         if (rc2 < 0)
488                 rc = rc2;
489
490         lu_env_fini(&env);
491
492         RETURN(rc);
493 }
494
495 int nodemap_idx_range_add(const struct lu_nid_range *range,
496                           const lnet_nid_t nid[2])
497 {
498         struct nodemap_key       nk;
499         union nodemap_rec        nr;
500         struct lu_env            env;
501         int                      rc = 0;
502         ENTRY;
503
504         if (nodemap_mgs_ncf == NULL) {
505                 CERROR("cannot add nodemap config to non-existing MGS.\n");
506                 return -EINVAL;
507         }
508
509         rc = lu_env_init(&env, LCT_LOCAL);
510         if (rc != 0)
511                 RETURN(rc);
512
513         nodemap_range_key_init(&nk, range->rn_nodemap->nm_id, range->rn_id);
514         nodemap_range_rec_init(&nr, nid);
515
516         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
517         lu_env_fini(&env);
518
519         RETURN(rc);
520 }
521
522 int nodemap_idx_range_del(const struct lu_nid_range *range)
523 {
524         struct nodemap_key       nk;
525         struct lu_env            env;
526         int                      rc = 0;
527         ENTRY;
528
529         if (nodemap_mgs_ncf == NULL) {
530                 CERROR("cannot add nodemap config to non-existing MGS.\n");
531                 return -EINVAL;
532         }
533
534         rc = lu_env_init(&env, LCT_LOCAL);
535         if (rc != 0)
536                 RETURN(rc);
537
538         nodemap_range_key_init(&nk, range->rn_nodemap->nm_id, range->rn_id);
539
540         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
541         lu_env_fini(&env);
542
543         RETURN(rc);
544 }
545
546 int nodemap_idx_idmap_add(const struct lu_nodemap *nodemap,
547                           enum nodemap_id_type id_type,
548                           const u32 map[2])
549 {
550         struct nodemap_key       nk;
551         union nodemap_rec        nr;
552         struct lu_env            env;
553         int                      rc = 0;
554         ENTRY;
555
556         if (nodemap_mgs_ncf == NULL) {
557                 CERROR("cannot add nodemap config to non-existing MGS.\n");
558                 return -EINVAL;
559         }
560
561         rc = lu_env_init(&env, LCT_LOCAL);
562         if (rc != 0)
563                 RETURN(rc);
564
565         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
566         nodemap_idmap_rec_init(&nr, map[1]);
567
568         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
569         lu_env_fini(&env);
570
571         RETURN(rc);
572 }
573
574 int nodemap_idx_idmap_del(const struct lu_nodemap *nodemap,
575                           enum nodemap_id_type id_type,
576                           const u32 map[2])
577 {
578         struct nodemap_key       nk;
579         struct lu_env            env;
580         int                      rc = 0;
581         ENTRY;
582
583         if (nodemap_mgs_ncf == NULL) {
584                 CERROR("cannot add nodemap config to non-existing MGS.\n");
585                 return -EINVAL;
586         }
587
588         rc = lu_env_init(&env, LCT_LOCAL);
589         if (rc != 0)
590                 RETURN(rc);
591
592         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
593
594         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
595         lu_env_fini(&env);
596
597         RETURN(rc);
598 }
599
600 static int nodemap_idx_global_add_update(bool value, enum nm_add_update update)
601 {
602         struct nodemap_key       nk;
603         union nodemap_rec        nr;
604         struct lu_env            env;
605         int                      rc = 0;
606         ENTRY;
607
608         if (nodemap_mgs_ncf == NULL) {
609                 CERROR("cannot add nodemap config to non-existing MGS.\n");
610                 return -EINVAL;
611         }
612
613         rc = lu_env_init(&env, LCT_LOCAL);
614         if (rc != 0)
615                 RETURN(rc);
616
617         nodemap_global_key_init(&nk);
618         nodemap_global_rec_init(&nr, value);
619
620         if (update == NM_UPDATE)
621                 rc = nodemap_idx_update(&env, nodemap_mgs_ncf->ncf_obj,
622                                         &nk, &nr);
623         else
624                 rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj,
625                                         &nk, &nr);
626
627         lu_env_fini(&env);
628
629         RETURN(rc);
630 }
631
632 int nodemap_idx_nodemap_activate(bool value)
633 {
634         return nodemap_idx_global_add_update(value, NM_UPDATE);
635 }
636
637 static enum nodemap_idx_type nodemap_get_key_type(const struct nodemap_key *key)
638 {
639         u32                      nodemap_id;
640
641         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
642         return nm_idx_get_type(nodemap_id);
643 }
644
645 /**
646  * Process a key/rec pair and modify the new configuration.
647  *
648  * \param       config          configuration to update with this key/rec data
649  * \param       key             key of the record that was loaded
650  * \param       rec             record that was loaded
651  * \param       recent_nodemap  last referenced nodemap
652  * \retval      type of record processed, see enum #nodemap_idx_type
653  * \retval      -ENOENT         range or map loaded before nodemap record
654  * \retval      -EINVAL         duplicate nodemap cluster records found with
655  *                              different IDs, or nodemap has invalid name
656  * \retval      -ENOMEM
657  */
658 static int nodemap_process_keyrec(struct nodemap_config *config,
659                                   const struct nodemap_key *key,
660                                   const union nodemap_rec *rec,
661                                   struct lu_nodemap **recent_nodemap)
662 {
663         struct lu_nodemap *nodemap = NULL;
664         enum nodemap_idx_type type;
665         enum nodemap_id_type id_type;
666         u8 flags;
667         u32 nodemap_id;
668         lnet_nid_t nid[2];
669         u32 map[2];
670         int rc;
671
672         ENTRY;
673
674         BUILD_BUG_ON(sizeof(union nodemap_rec) != 32);
675
676         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
677         type = nodemap_get_key_type(key);
678         nodemap_id = nm_idx_set_type(nodemap_id, 0);
679
680         CDEBUG(D_INFO, "found config entry, nm_id %d type %d\n",
681                nodemap_id, type);
682
683         /* find the correct nodemap in the load list */
684         if (type == NODEMAP_RANGE_IDX || type == NODEMAP_UIDMAP_IDX ||
685             type == NODEMAP_GIDMAP_IDX) {
686                 struct lu_nodemap *tmp = NULL;
687
688                 nodemap = *recent_nodemap;
689
690                 if (nodemap == NULL)
691                         GOTO(out, rc = -ENOENT);
692
693                 if (nodemap->nm_id != nodemap_id) {
694                         list_for_each_entry(tmp, &nodemap->nm_list, nm_list)
695                                 if (tmp->nm_id == nodemap_id) {
696                                         nodemap = tmp;
697                                         break;
698                                 }
699
700                         if (nodemap->nm_id != nodemap_id)
701                                 GOTO(out, rc = -ENOENT);
702                 }
703
704                 /* update most recently used nodemap if necessay */
705                 if (nodemap != *recent_nodemap)
706                         *recent_nodemap = nodemap;
707         }
708
709         switch (type) {
710         case NODEMAP_EMPTY_IDX:
711                 if (nodemap_id != 0)
712                         CWARN("Found nodemap config record without type field, "
713                               " nodemap_id=%d. nodemap config file corrupt?\n",
714                               nodemap_id);
715                 break;
716         case NODEMAP_CLUSTER_IDX: {
717                 struct lu_nodemap *old_nm = NULL;
718
719                 nodemap = cfs_hash_lookup(config->nmc_nodemap_hash,
720                                           rec->ncr.ncr_name);
721                 if (nodemap == NULL) {
722                         if (nodemap_id == LUSTRE_NODEMAP_DEFAULT_ID) {
723                                 nodemap = nodemap_create(rec->ncr.ncr_name,
724                                                          config, 1);
725                         } else {
726                                 nodemap = nodemap_create(rec->ncr.ncr_name,
727                                                          config, 0);
728                         }
729                         if (IS_ERR(nodemap))
730                                 GOTO(out, rc = PTR_ERR(nodemap));
731
732                         /* we need to override the local ID with the saved ID */
733                         nodemap->nm_id = nodemap_id;
734                         if (nodemap_id > config->nmc_nodemap_highest_id)
735                                 config->nmc_nodemap_highest_id = nodemap_id;
736
737                 } else if (nodemap->nm_id != nodemap_id) {
738                         nodemap_putref(nodemap);
739                         GOTO(out, rc = -EINVAL);
740                 }
741
742                 nodemap->nm_squash_uid =
743                                 le32_to_cpu(rec->ncr.ncr_squash_uid);
744                 nodemap->nm_squash_gid =
745                                 le32_to_cpu(rec->ncr.ncr_squash_gid);
746
747                 flags = le32_to_cpu(rec->ncr.ncr_flags);
748                 nodemap->nmf_allow_root_access =
749                                         flags & NM_FL_ALLOW_ROOT_ACCESS;
750                 nodemap->nmf_trust_client_ids =
751                                         flags & NM_FL_TRUST_CLIENT_IDS;
752                 nodemap->nmf_deny_unknown =
753                                         flags & NM_FL_DENY_UNKNOWN;
754                 nodemap->nmf_map_uid_only =
755                                         flags & NM_FL_MAP_UID_ONLY;
756                 nodemap->nmf_map_gid_only =
757                                         flags & NM_FL_MAP_GID_ONLY;
758                 nodemap->nmf_enable_audit =
759                                         flags & NM_FL_ENABLE_AUDIT;
760
761                 /* The fileset should be saved otherwise it will be empty
762                  * every time in case of "NODEMAP_CLUSTER_IDX". */
763                 mutex_lock(&active_config_lock);
764                 old_nm = nodemap_lookup(rec->ncr.ncr_name);
765                 if (!IS_ERR(old_nm) && old_nm->nm_fileset[0] != '\0')
766                         strlcpy(nodemap->nm_fileset, old_nm->nm_fileset,
767                                 sizeof(nodemap->nm_fileset));
768                 mutex_unlock(&active_config_lock);
769                 if (!IS_ERR(old_nm))
770                         nodemap_putref(old_nm);
771
772                 if (*recent_nodemap == NULL) {
773                         *recent_nodemap = nodemap;
774                         INIT_LIST_HEAD(&nodemap->nm_list);
775                 } else {
776                         list_add(&nodemap->nm_list,
777                                  &(*recent_nodemap)->nm_list);
778                 }
779                 nodemap_putref(nodemap);
780                 break;
781         }
782         case NODEMAP_RANGE_IDX:
783                 nid[0] = le64_to_cpu(rec->nrr.nrr_start_nid);
784                 nid[1] = le64_to_cpu(rec->nrr.nrr_end_nid);
785
786                 rc = nodemap_add_range_helper(config, nodemap, nid,
787                                         le32_to_cpu(key->nk_range_id));
788                 if (rc != 0)
789                         GOTO(out, rc);
790                 break;
791         case NODEMAP_UIDMAP_IDX:
792         case NODEMAP_GIDMAP_IDX:
793                 map[0] = le32_to_cpu(key->nk_id_client);
794                 map[1] = le32_to_cpu(rec->nir.nir_id_fs);
795
796                 if (type == NODEMAP_UIDMAP_IDX)
797                         id_type = NODEMAP_UID;
798                 else
799                         id_type = NODEMAP_GID;
800
801                 rc = nodemap_add_idmap_helper(nodemap, id_type, map);
802                 if (rc != 0)
803                         GOTO(out, rc);
804                 break;
805         case NODEMAP_GLOBAL_IDX:
806                 config->nmc_nodemap_is_active = rec->ngr.ngr_is_active;
807                 break;
808         default:
809                 CERROR("got keyrec pair for unknown type %d\n", type);
810                 break;
811         }
812
813         rc = type;
814
815         EXIT;
816
817 out:
818         return rc;
819 }
820
821 enum nm_config_passes {
822         NM_READ_CLUSTERS = 0,
823         NM_READ_ATTRIBUTES = 1,
824 };
825
826 static int nodemap_load_entries(const struct lu_env *env,
827                                 struct dt_object *nodemap_idx)
828 {
829         const struct dt_it_ops *iops;
830         struct dt_it *it;
831         struct lu_nodemap *recent_nodemap = NULL;
832         struct nodemap_config *new_config = NULL;
833         u64 hash = 0;
834         bool activate_nodemap = false;
835         bool loaded_global_idx = false;
836         enum nm_config_passes cur_pass = NM_READ_CLUSTERS;
837         int rc = 0;
838
839         ENTRY;
840
841         iops = &nodemap_idx->do_index_ops->dio_it;
842
843         dt_read_lock(env, nodemap_idx, 0);
844         it = iops->init(env, nodemap_idx, 0);
845         if (IS_ERR(it))
846                 GOTO(out, rc = PTR_ERR(it));
847
848         rc = iops->load(env, it, hash);
849         if (rc < 0)
850                 GOTO(out_iops_fini, rc);
851
852         /* rc == 0 means we need to advance to record */
853         if (rc == 0) {
854                 rc = iops->next(env, it);
855
856                 if (rc < 0)
857                         GOTO(out_iops_put, rc);
858                 /* rc > 0 is eof, will be checked in while below */
859         } else {
860                 /* rc == 1, we found initial record and can process below */
861                 rc = 0;
862         }
863
864         new_config = nodemap_config_alloc();
865         if (IS_ERR(new_config)) {
866                 rc = PTR_ERR(new_config);
867                 new_config = NULL;
868                 GOTO(out_iops_put, rc);
869         }
870
871         /* rc > 0 is eof, check initial iops->next here as well */
872         while (rc == 0) {
873                 struct nodemap_key *key;
874                 union nodemap_rec rec;
875                 enum nodemap_idx_type key_type;
876
877                 key = (struct nodemap_key *)iops->key(env, it);
878                 key_type = nodemap_get_key_type((struct nodemap_key *)key);
879                 if ((cur_pass == NM_READ_CLUSTERS &&
880                                 key_type == NODEMAP_CLUSTER_IDX) ||
881                     (cur_pass == NM_READ_ATTRIBUTES &&
882                                 key_type != NODEMAP_CLUSTER_IDX &&
883                                 key_type != NODEMAP_EMPTY_IDX)) {
884                         rc = iops->rec(env, it, (struct dt_rec *)&rec, 0);
885                         if (rc != -ESTALE) {
886                                 if (rc != 0)
887                                         GOTO(out_nodemap_config, rc);
888                                 rc = nodemap_process_keyrec(new_config, key, &rec,
889                                                             &recent_nodemap);
890                                 if (rc < 0)
891                                         GOTO(out_nodemap_config, rc);
892                                 if (rc == NODEMAP_GLOBAL_IDX)
893                                         loaded_global_idx = true;
894                         }
895                 }
896
897                 do
898                         rc = iops->next(env, it);
899                 while (rc == -ESTALE);
900
901                 /* move to second pass */
902                 if (rc > 0 && cur_pass == NM_READ_CLUSTERS) {
903                         cur_pass = NM_READ_ATTRIBUTES;
904                         rc = iops->load(env, it, 0);
905                         if (rc == 0)
906                                 rc = iops->next(env, it);
907                         else if (rc > 0)
908                                 rc = 0;
909                         else
910                                 GOTO(out, rc);
911                 }
912         }
913
914         if (rc > 0)
915                 rc = 0;
916
917 out_nodemap_config:
918         if (rc != 0)
919                 nodemap_config_dealloc(new_config);
920         else
921                 /* creating new default needs to be done outside dt read lock */
922                 activate_nodemap = true;
923 out_iops_put:
924         iops->put(env, it);
925 out_iops_fini:
926         iops->fini(env, it);
927 out:
928         dt_read_unlock(env, nodemap_idx);
929
930         if (rc != 0)
931                 CWARN("%s: failed to load nodemap configuration: rc = %d\n",
932                       nodemap_idx->do_lu.lo_dev->ld_obd->obd_name, rc);
933
934         if (!activate_nodemap)
935                 RETURN(rc);
936
937         if (new_config->nmc_default_nodemap == NULL) {
938                 /* new MGS won't have a default nm on disk, so create it here */
939                 struct lu_nodemap *nodemap =
940                         nodemap_create(DEFAULT_NODEMAP, new_config, 1);
941                 if (IS_ERR(nodemap)) {
942                         rc = PTR_ERR(nodemap);
943                 } else {
944                         rc = nodemap_idx_nodemap_add_update(
945                                         new_config->nmc_default_nodemap,
946                                         nodemap_idx,
947                                         NM_ADD);
948                         nodemap_putref(new_config->nmc_default_nodemap);
949                 }
950         }
951
952         /* new nodemap config won't have an active/inactive record */
953         if (rc == 0 && loaded_global_idx == false) {
954                 struct nodemap_key       nk;
955                 union nodemap_rec        nr;
956
957                 nodemap_global_key_init(&nk);
958                 nodemap_global_rec_init(&nr, false);
959                 rc = nodemap_idx_insert(env, nodemap_idx, &nk, &nr);
960         }
961
962         if (rc == 0)
963                 nodemap_config_set_active(new_config);
964         else
965                 nodemap_config_dealloc(new_config);
966
967         RETURN(rc);
968 }
969
970 /**
971  * Step through active config and write to disk.
972  */
973 struct dt_object *nodemap_save_config_cache(const struct lu_env *env,
974                                             struct dt_device *dev,
975                                             struct local_oid_storage *los)
976 {
977         struct dt_object *o;
978         struct lu_nodemap *nodemap;
979         struct lu_nodemap *nm_tmp;
980         struct lu_nid_range *range;
981         struct lu_nid_range *range_temp;
982         struct lu_idmap *idmap;
983         struct lu_idmap *id_tmp;
984         struct rb_root root;
985         struct nodemap_key nk;
986         union nodemap_rec nr;
987         LIST_HEAD(nodemap_list_head);
988         int rc = 0, rc2;
989
990         ENTRY;
991
992         /* create a new index file to fill with active config */
993         o = nodemap_cache_find_create(env, dev, los, NCFC_CREATE_NEW);
994         if (IS_ERR(o))
995                 RETURN(o);
996
997         mutex_lock(&active_config_lock);
998
999         /* convert hash to list so we don't spin */
1000         cfs_hash_for_each_safe(active_config->nmc_nodemap_hash,
1001                                nm_hash_list_cb, &nodemap_list_head);
1002
1003         list_for_each_entry_safe(nodemap, nm_tmp, &nodemap_list_head, nm_list) {
1004                 nodemap_cluster_key_init(&nk, nodemap->nm_id);
1005                 nodemap_cluster_rec_init(&nr, nodemap);
1006
1007                 rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1008                 if (rc2 < 0) {
1009                         rc = rc2;
1010                         continue;
1011                 }
1012
1013                 down_read(&active_config->nmc_range_tree_lock);
1014                 list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
1015                                          rn_list) {
1016                         lnet_nid_t nid[2] = {
1017                                 range->rn_node.in_extent.start,
1018                                 range->rn_node.in_extent.end
1019                         };
1020                         nodemap_range_key_init(&nk, nodemap->nm_id,
1021                                                range->rn_id);
1022                         nodemap_range_rec_init(&nr, nid);
1023                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1024                         if (rc2 < 0)
1025                                 rc = rc2;
1026                 }
1027                 up_read(&active_config->nmc_range_tree_lock);
1028
1029                 /* we don't need to take nm_idmap_lock because active config
1030                  * lock prevents changes from happening to nodemaps
1031                  */
1032                 root = nodemap->nm_client_to_fs_uidmap;
1033                 nm_rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1034                                                         id_client_to_fs) {
1035                         nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
1036                                                idmap->id_client);
1037                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1038                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1039                         if (rc2 < 0)
1040                                 rc = rc2;
1041                 }
1042
1043                 root = nodemap->nm_client_to_fs_gidmap;
1044                 nm_rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1045                                                         id_client_to_fs) {
1046                         nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
1047                                                idmap->id_client);
1048                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1049                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1050                         if (rc2 < 0)
1051                                 rc = rc2;
1052                 }
1053         }
1054         nodemap_global_key_init(&nk);
1055         nodemap_global_rec_init(&nr, active_config->nmc_nodemap_is_active);
1056         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1057         if (rc2 < 0)
1058                 rc = rc2;
1059
1060         mutex_unlock(&active_config_lock);
1061
1062         if (rc < 0) {
1063                 dt_object_put(env, o);
1064                 o = ERR_PTR(rc);
1065         }
1066
1067         RETURN(o);
1068 }
1069
1070 static void nodemap_save_all_caches(void)
1071 {
1072         struct nm_config_file   *ncf;
1073         struct lu_env            env;
1074         int                      rc = 0;
1075
1076         /* recreating nodemap cache requires fld_thread_key be in env */
1077         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD);
1078         if (rc != 0) {
1079                 CWARN("cannot init env for nodemap config: rc = %d\n", rc);
1080                 return;
1081         }
1082
1083         mutex_lock(&ncf_list_lock);
1084         list_for_each_entry(ncf, &ncf_list_head, ncf_list) {
1085                 struct dt_device *dev = lu2dt_dev(ncf->ncf_obj->do_lu.lo_dev);
1086                 struct obd_device *obd = ncf->ncf_obj->do_lu.lo_dev->ld_obd;
1087                 struct dt_object *o;
1088
1089                 /* put current config file so save conf can rewrite it */
1090                 dt_object_put_nocache(&env, ncf->ncf_obj);
1091                 ncf->ncf_obj = NULL;
1092
1093                 o = nodemap_save_config_cache(&env, dev, ncf->ncf_los);
1094                 if (IS_ERR(o))
1095                         CWARN("%s: error writing to nodemap config: rc = %d\n",
1096                               obd->obd_name, rc);
1097                 else
1098                         ncf->ncf_obj = o;
1099         }
1100         mutex_unlock(&ncf_list_lock);
1101
1102         lu_env_fini(&env);
1103 }
1104
1105 /* tracks if config still needs to be loaded, either from disk or network */
1106 static bool nodemap_config_loaded;
1107 static DEFINE_MUTEX(nodemap_config_loaded_lock);
1108
1109 /**
1110  * Ensures that configs loaded over the wire are prioritized over those loaded
1111  * from disk.
1112  *
1113  * \param config        config to set as the active config
1114  */
1115 void nodemap_config_set_active_mgc(struct nodemap_config *config)
1116 {
1117         mutex_lock(&nodemap_config_loaded_lock);
1118         nodemap_config_set_active(config);
1119         nodemap_config_loaded = true;
1120         nodemap_save_all_caches();
1121         mutex_unlock(&nodemap_config_loaded_lock);
1122 }
1123 EXPORT_SYMBOL(nodemap_config_set_active_mgc);
1124
1125 /**
1126  * Register a dt_object representing the config index file. This should be
1127  * called by targets in order to load the nodemap configuration from disk. The
1128  * dt_object should be created with local_index_find_or_create and the index
1129  * features should be enabled with do_index_try.
1130  *
1131  * \param obj   dt_object returned by local_index_find_or_create
1132  *
1133  * \retval      on success: nm_config_file handle for later deregistration
1134  * \retval      -ENOMEM         memory allocation failure
1135  * \retval      -ENOENT         error loading nodemap config
1136  * \retval      -EINVAL         error loading nodemap config
1137  * \retval      -EEXIST         nodemap config already registered for MGS
1138  */
1139 struct nm_config_file *nm_config_file_register_mgs(const struct lu_env *env,
1140                                                    struct dt_object *obj,
1141                                                    struct local_oid_storage *los)
1142 {
1143         struct nm_config_file *ncf;
1144         int rc = 0;
1145         ENTRY;
1146
1147         if (nodemap_mgs_ncf != NULL)
1148                 GOTO(out, ncf = ERR_PTR(-EEXIST));
1149
1150         OBD_ALLOC_PTR(ncf);
1151         if (ncf == NULL)
1152                 GOTO(out, ncf = ERR_PTR(-ENOMEM));
1153
1154         /* if loading from cache, prevent activation of MGS config until cache
1155          * loading is done, so disk config is overwritten by MGS config.
1156          */
1157         mutex_lock(&nodemap_config_loaded_lock);
1158         rc = nodemap_load_entries(env, obj);
1159         if (!rc)
1160                 nodemap_config_loaded = true;
1161         mutex_unlock(&nodemap_config_loaded_lock);
1162
1163         if (rc) {
1164                 OBD_FREE_PTR(ncf);
1165                 GOTO(out, ncf = ERR_PTR(rc));
1166         }
1167
1168         lu_object_get(&obj->do_lu);
1169
1170         ncf->ncf_obj = obj;
1171         ncf->ncf_los = los;
1172
1173         nodemap_mgs_ncf = ncf;
1174
1175 out:
1176         return ncf;
1177 }
1178 EXPORT_SYMBOL(nm_config_file_register_mgs);
1179
1180 struct nm_config_file *nm_config_file_register_tgt(const struct lu_env *env,
1181                                                    struct dt_device *dev,
1182                                                    struct local_oid_storage *los)
1183 {
1184         struct nm_config_file *ncf;
1185         struct dt_object *config_obj = NULL;
1186         int rc = 0;
1187
1188         OBD_ALLOC_PTR(ncf);
1189         if (ncf == NULL)
1190                 RETURN(ERR_PTR(-ENOMEM));
1191
1192         /* don't load from cache if config already loaded */
1193         mutex_lock(&nodemap_config_loaded_lock);
1194         if (!nodemap_config_loaded) {
1195                 config_obj = nodemap_cache_find_create(env, dev, los, 0);
1196                 if (IS_ERR(config_obj))
1197                         rc = PTR_ERR(config_obj);
1198                 else
1199                         rc = nodemap_load_entries(env, config_obj);
1200
1201                 if (!rc)
1202                         nodemap_config_loaded = true;
1203         }
1204         mutex_unlock(&nodemap_config_loaded_lock);
1205         if (rc)
1206                 GOTO(out_ncf, rc);
1207
1208         /* sync on disk caches w/ loaded config in memory, ncf_obj may change */
1209         if (!config_obj) {
1210                 config_obj = nodemap_save_config_cache(env, dev, los);
1211                 if (IS_ERR(config_obj))
1212                         GOTO(out_ncf, rc = PTR_ERR(config_obj));
1213         }
1214
1215         ncf->ncf_obj = config_obj;
1216         ncf->ncf_los = los;
1217
1218         mutex_lock(&ncf_list_lock);
1219         list_add(&ncf->ncf_list, &ncf_list_head);
1220         mutex_unlock(&ncf_list_lock);
1221
1222 out_ncf:
1223         if (rc) {
1224                 OBD_FREE_PTR(ncf);
1225                 RETURN(ERR_PTR(rc));
1226         }
1227
1228         RETURN(ncf);
1229 }
1230 EXPORT_SYMBOL(nm_config_file_register_tgt);
1231
1232 /**
1233  * Deregister a nm_config_file. Should be called by targets during cleanup.
1234  *
1235  * \param ncf   config file to deregister
1236  */
1237 void nm_config_file_deregister_mgs(const struct lu_env *env,
1238                                    struct nm_config_file *ncf)
1239 {
1240         ENTRY;
1241         LASSERT(nodemap_mgs_ncf == ncf);
1242
1243         nodemap_mgs_ncf = NULL;
1244         if (ncf->ncf_obj)
1245                 dt_object_put(env, ncf->ncf_obj);
1246
1247         OBD_FREE_PTR(ncf);
1248
1249         EXIT;
1250 }
1251 EXPORT_SYMBOL(nm_config_file_deregister_mgs);
1252
1253 void nm_config_file_deregister_tgt(const struct lu_env *env,
1254                                    struct nm_config_file *ncf)
1255 {
1256         ENTRY;
1257
1258         if (ncf == NULL)
1259                 return;
1260
1261         mutex_lock(&ncf_list_lock);
1262         list_del(&ncf->ncf_list);
1263         mutex_unlock(&ncf_list_lock);
1264
1265         if (ncf->ncf_obj)
1266                 dt_object_put(env, ncf->ncf_obj);
1267
1268         OBD_FREE_PTR(ncf);
1269
1270         EXIT;
1271 }
1272 EXPORT_SYMBOL(nm_config_file_deregister_tgt);
1273
1274 int nodemap_process_idx_pages(struct nodemap_config *config, union lu_page *lip,
1275                               struct lu_nodemap **recent_nodemap)
1276 {
1277         struct nodemap_key *key;
1278         union nodemap_rec *rec;
1279         char *entry;
1280         int j;
1281         int k;
1282         int rc = 0;
1283         int size = dt_nodemap_features.dif_keysize_max +
1284                    dt_nodemap_features.dif_recsize_max;
1285         ENTRY;
1286
1287         for (j = 0; j < LU_PAGE_COUNT; j++) {
1288                 if (lip->lp_idx.lip_magic != LIP_MAGIC)
1289                         return -EINVAL;
1290
1291                 /* get and process keys and records from page */
1292                 for (k = 0; k < lip->lp_idx.lip_nr; k++) {
1293                         entry = lip->lp_idx.lip_entries + k * size;
1294                         key = (struct nodemap_key *)entry;
1295
1296                         entry += dt_nodemap_features.dif_keysize_max;
1297                         rec = (union nodemap_rec *)entry;
1298
1299                         rc = nodemap_process_keyrec(config, key, rec,
1300                                                     recent_nodemap);
1301                         if (rc < 0)
1302                                 return rc;
1303                 }
1304                 lip++;
1305         }
1306
1307         EXIT;
1308         return 0;
1309 }
1310 EXPORT_SYMBOL(nodemap_process_idx_pages);
1311
1312 static int nodemap_page_build(const struct lu_env *env, union lu_page *lp,
1313                               size_t nob, const struct dt_it_ops *iops,
1314                               struct dt_it *it, __u32 attr, void *arg)
1315 {
1316         struct idx_info *ii = (struct idx_info *)arg;
1317         struct lu_idxpage *lip = &lp->lp_idx;
1318         char *entry;
1319         size_t size = ii->ii_keysize + ii->ii_recsize;
1320         int rc;
1321         ENTRY;
1322
1323         if (nob < LIP_HDR_SIZE)
1324                 return -EINVAL;
1325
1326         /* initialize the header of the new container */
1327         memset(lip, 0, LIP_HDR_SIZE);
1328         lip->lip_magic = LIP_MAGIC;
1329         nob           -= LIP_HDR_SIZE;
1330
1331         entry = lip->lip_entries;
1332         do {
1333                 char            *tmp_entry = entry;
1334                 struct dt_key   *key;
1335                 __u64           hash;
1336                 enum nodemap_idx_type key_type;
1337
1338                 /* fetch 64-bit hash value */
1339                 hash = iops->store(env, it);
1340                 ii->ii_hash_end = hash;
1341
1342                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_IDX_READ_BREAK)) {
1343                         if (lip->lip_nr != 0)
1344                                 GOTO(out, rc = 0);
1345                 }
1346
1347                 if (nob < size) {
1348                         if (lip->lip_nr == 0)
1349                                 GOTO(out, rc = -EINVAL);
1350                         GOTO(out, rc = 0);
1351                 }
1352
1353                 key = iops->key(env, it);
1354                 key_type = nodemap_get_key_type((struct nodemap_key *)key);
1355
1356                 /* on the first pass, get only the cluster types. On second
1357                  * pass, get all the rest */
1358                 if ((ii->ii_attrs == NM_READ_CLUSTERS &&
1359                                 key_type == NODEMAP_CLUSTER_IDX) ||
1360                     (ii->ii_attrs == NM_READ_ATTRIBUTES &&
1361                                 key_type != NODEMAP_CLUSTER_IDX &&
1362                                 key_type != NODEMAP_EMPTY_IDX)) {
1363                         memcpy(tmp_entry, key, ii->ii_keysize);
1364                         tmp_entry += ii->ii_keysize;
1365
1366                         /* and finally the record */
1367                         rc = iops->rec(env, it, (struct dt_rec *)tmp_entry,
1368                                        attr);
1369                         if (rc != -ESTALE) {
1370                                 if (rc != 0)
1371                                         GOTO(out, rc);
1372
1373                                 /* hash/key/record successfully copied! */
1374                                 lip->lip_nr++;
1375                                 if (unlikely(lip->lip_nr == 1 &&
1376                                     ii->ii_count == 0))
1377                                         ii->ii_hash_start = hash;
1378
1379                                 entry = tmp_entry + ii->ii_recsize;
1380                                 nob -= size;
1381                         }
1382                 }
1383
1384                 /* move on to the next record */
1385                 do {
1386                         rc = iops->next(env, it);
1387                 } while (rc == -ESTALE);
1388
1389                 /* move to second pass */
1390                 if (rc > 0 && ii->ii_attrs == NM_READ_CLUSTERS) {
1391                         ii->ii_attrs = NM_READ_ATTRIBUTES;
1392                         rc = iops->load(env, it, 0);
1393                         if (rc == 0)
1394                                 rc = iops->next(env, it);
1395                         else if (rc > 0)
1396                                 rc = 0;
1397                         else
1398                                 GOTO(out, rc);
1399                 }
1400
1401         } while (rc == 0);
1402
1403         GOTO(out, rc);
1404 out:
1405         if (rc >= 0 && lip->lip_nr > 0)
1406                 /* one more container */
1407                 ii->ii_count++;
1408         if (rc > 0)
1409                 /* no more entries */
1410                 ii->ii_hash_end = II_END_OFF;
1411         return rc;
1412 }
1413
1414
1415 int nodemap_index_read(struct lu_env *env,
1416                        struct nm_config_file *ncf,
1417                        struct idx_info *ii,
1418                        const struct lu_rdpg *rdpg)
1419 {
1420         struct dt_object        *nodemap_idx = ncf->ncf_obj;
1421         __u64                    version;
1422         int                      rc = 0;
1423
1424         ii->ii_keysize = dt_nodemap_features.dif_keysize_max;
1425         ii->ii_recsize = dt_nodemap_features.dif_recsize_max;
1426
1427         dt_read_lock(env, nodemap_idx, 0);
1428         version = dt_version_get(env, nodemap_idx);
1429         if (rdpg->rp_hash != 0 && ii->ii_version != version) {
1430                 CDEBUG(D_INFO, "nodemap config changed inflight, old %llu, new %llu\n",
1431                        ii->ii_version,
1432                        version);
1433                 ii->ii_hash_end = 0;
1434         } else {
1435                 rc = dt_index_walk(env, nodemap_idx, rdpg, nodemap_page_build,
1436                                    ii);
1437                 CDEBUG(D_INFO, "walked index, hashend %llx\n", ii->ii_hash_end);
1438         }
1439
1440         if (rc >= 0)
1441                 ii->ii_version = version;
1442
1443         dt_read_unlock(env, nodemap_idx);
1444         return rc;
1445 }
1446 EXPORT_SYMBOL(nodemap_index_read);
1447
1448 /**
1449  * Returns the current nodemap configuration to MGC by walking the nodemap
1450  * config index and storing it in the response buffer.
1451  *
1452  * \param       req             incoming MGS_CONFIG_READ request
1453  * \retval      0               success
1454  * \retval      -EINVAL         malformed request
1455  * \retval      -ENOTCONN       client evicted/reconnected already
1456  * \retval      -ETIMEDOUT      client timeout or network error
1457  * \retval      -ENOMEM
1458  */
1459 int nodemap_get_config_req(struct obd_device *mgs_obd,
1460                            struct ptlrpc_request *req)
1461 {
1462         const struct ptlrpc_bulk_frag_ops *frag_ops = &ptlrpc_bulk_kiov_pin_ops;
1463         struct mgs_config_body *body;
1464         struct mgs_config_res *res;
1465         struct lu_rdpg rdpg;
1466         struct idx_info nodemap_ii;
1467         struct ptlrpc_bulk_desc *desc;
1468         struct tg_export_data *rqexp_ted = &req->rq_export->exp_target_data;
1469         int i;
1470         int page_count;
1471         int bytes = 0;
1472         int rc = 0;
1473
1474         body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
1475         if (!body)
1476                 RETURN(-EINVAL);
1477
1478         if (body->mcb_type != CONFIG_T_NODEMAP)
1479                 RETURN(-EINVAL);
1480
1481         rdpg.rp_count = (body->mcb_units << body->mcb_bits);
1482         rdpg.rp_npages = (rdpg.rp_count + PAGE_SIZE - 1) >>
1483                 PAGE_SHIFT;
1484         if (rdpg.rp_npages > PTLRPC_MAX_BRW_PAGES)
1485                 RETURN(-EINVAL);
1486
1487         CDEBUG(D_INFO, "reading nodemap log, name '%s', size = %u\n",
1488                body->mcb_name, rdpg.rp_count);
1489
1490         /* allocate pages to store the containers */
1491         OBD_ALLOC(rdpg.rp_pages, sizeof(*rdpg.rp_pages) * rdpg.rp_npages);
1492         if (rdpg.rp_pages == NULL)
1493                 RETURN(-ENOMEM);
1494         for (i = 0; i < rdpg.rp_npages; i++) {
1495                 rdpg.rp_pages[i] = alloc_page(GFP_NOFS);
1496                 if (rdpg.rp_pages[i] == NULL)
1497                         GOTO(out, rc = -ENOMEM);
1498         }
1499
1500         rdpg.rp_hash = body->mcb_offset;
1501         nodemap_ii.ii_magic = IDX_INFO_MAGIC;
1502         nodemap_ii.ii_flags = II_FL_NOHASH;
1503         nodemap_ii.ii_version = rqexp_ted->ted_nodemap_version;
1504         nodemap_ii.ii_attrs = body->mcb_nm_cur_pass;
1505
1506         bytes = nodemap_index_read(req->rq_svc_thread->t_env,
1507                                    mgs_obd->u.obt.obt_nodemap_config_file,
1508                                    &nodemap_ii, &rdpg);
1509         if (bytes < 0)
1510                 GOTO(out, rc = bytes);
1511
1512         rqexp_ted->ted_nodemap_version = nodemap_ii.ii_version;
1513
1514         res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
1515         if (res == NULL)
1516                 GOTO(out, rc = -EINVAL);
1517         res->mcr_offset = nodemap_ii.ii_hash_end;
1518         res->mcr_nm_cur_pass = nodemap_ii.ii_attrs;
1519
1520         page_count = (bytes + PAGE_SIZE - 1) >> PAGE_SHIFT;
1521         LASSERT(page_count <= rdpg.rp_count);
1522         desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
1523                                     PTLRPC_BULK_PUT_SOURCE |
1524                                         PTLRPC_BULK_BUF_KIOV,
1525                                     MGS_BULK_PORTAL, frag_ops);
1526         if (desc == NULL)
1527                 GOTO(out, rc = -ENOMEM);
1528
1529         for (i = 0; i < page_count && bytes > 0; i++) {
1530                 frag_ops->add_kiov_frag(desc, rdpg.rp_pages[i], 0,
1531                                         min_t(int, bytes, PAGE_SIZE));
1532                 bytes -= PAGE_SIZE;
1533         }
1534
1535         rc = target_bulk_io(req->rq_export, desc);
1536         ptlrpc_free_bulk(desc);
1537
1538 out:
1539         if (rdpg.rp_pages != NULL) {
1540                 for (i = 0; i < rdpg.rp_npages; i++)
1541                         if (rdpg.rp_pages[i] != NULL)
1542                                 __free_page(rdpg.rp_pages[i]);
1543                 OBD_FREE(rdpg.rp_pages,
1544                          rdpg.rp_npages * sizeof(rdpg.rp_pages[0]));
1545         }
1546         return rc;
1547 }
1548 EXPORT_SYMBOL(nodemap_get_config_req);