Whamcloud - gitweb
LU-13307 nodemap: have nodemap_add_member support large NIDs
[fs/lustre-release.git] / lustre / ptlrpc / nodemap_storage.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2015, Trustees of Indiana University
24  *
25  * Copyright (c) 2017, Intel Corporation.
26  *
27  * Author: Joshua Walgenbach <jjw@iu.edu>
28  * Author: Kit Westneat <cwestnea@iu.edu>
29  *
30  * Implements the storage functionality for the nodemap configuration. Functions
31  * in this file prepare, store, and load nodemap configuration data. Targets
32  * using nodemap services should register a configuration file object. Nodemap
33  * configuration changes that need to persist should call the appropriate
34  * storage function for the data being modified.
35  *
36  * There are several index types as defined in enum nodemap_idx_type:
37  *      NODEMAP_CLUSTER_IDX     stores the data found on the lu_nodemap struct,
38  *                              like root squash and config flags, as well as
39  *                              the name.
40  *      NODEMAP_RANGE_IDX       stores NID range information for a nodemap
41  *      NODEMAP_UIDMAP_IDX      stores a fs/client UID mapping pair
42  *      NODEMAP_GIDMAP_IDX      stores a fs/client GID mapping pair
43  *      NODEMAP_GLOBAL_IDX      stores whether or not nodemaps are active
44  */
45
46 #include <libcfs/libcfs.h>
47 #include <linux/err.h>
48 #include <linux/kernel.h>
49 #include <linux/list.h>
50 #include <linux/mutex.h>
51 #include <linux/string.h>
52 #include <linux/types.h>
53 #include <uapi/linux/lnet/lnet-types.h>
54 #include <uapi/linux/lustre/lustre_idl.h>
55 #include <uapi/linux/lustre/lustre_disk.h>
56 #include <dt_object.h>
57 #include <lu_object.h>
58 #include <lustre_net.h>
59 #include <lustre_nodemap.h>
60 #include <obd_class.h>
61 #include <obd_support.h>
62 #include "nodemap_internal.h"
63
64 /* list of registered nodemap index files, except MGS */
65 static LIST_HEAD(ncf_list_head);
66 static DEFINE_MUTEX(ncf_list_lock);
67
68 /* MGS index is different than others, others are listeners to MGS idx */
69 static struct nm_config_file *nodemap_mgs_ncf;
70
71 static void nodemap_cluster_key_init(struct nodemap_key *nk, unsigned int nm_id,
72                                      enum nodemap_cluster_rec_subid subid)
73 {
74         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
75                                                         NODEMAP_CLUSTER_IDX));
76         nk->nk_cluster_subid = subid;
77 }
78
79 static void nodemap_cluster_rec_init(union nodemap_rec *nr,
80                                      const struct lu_nodemap *nodemap)
81 {
82         BUILD_BUG_ON(sizeof(nr->ncr.ncr_name) != sizeof(nodemap->nm_name));
83
84         strncpy(nr->ncr.ncr_name, nodemap->nm_name, sizeof(nr->ncr.ncr_name));
85         nr->ncr.ncr_squash_uid = cpu_to_le32(nodemap->nm_squash_uid);
86         nr->ncr.ncr_squash_gid = cpu_to_le32(nodemap->nm_squash_gid);
87         nr->ncr.ncr_squash_projid = cpu_to_le32(nodemap->nm_squash_projid);
88         nr->ncr.ncr_flags =
89                 (nodemap->nmf_trust_client_ids ?
90                         NM_FL_TRUST_CLIENT_IDS : 0) |
91                 (nodemap->nmf_allow_root_access ?
92                         NM_FL_ALLOW_ROOT_ACCESS : 0) |
93                 (nodemap->nmf_deny_unknown ?
94                         NM_FL_DENY_UNKNOWN : 0) |
95                 (nodemap->nmf_map_mode & NODEMAP_MAP_UID ?
96                         NM_FL_MAP_UID : 0) |
97                 (nodemap->nmf_map_mode & NODEMAP_MAP_GID ?
98                         NM_FL_MAP_GID : 0) |
99                 (nodemap->nmf_map_mode & NODEMAP_MAP_PROJID ?
100                         NM_FL_MAP_PROJID : 0) |
101                 (nodemap->nmf_enable_audit ?
102                         NM_FL_ENABLE_AUDIT : 0) |
103                 (nodemap->nmf_forbid_encryption ?
104                         NM_FL_FORBID_ENCRYPT : 0);
105         nr->ncr.ncr_flags2 =
106                 (nodemap->nmf_readonly_mount ?
107                         NM_FL2_READONLY_MOUNT : 0);
108 }
109
110 static void nodemap_cluster_roles_rec_init(union nodemap_rec *nr,
111                                            const struct lu_nodemap *nodemap)
112 {
113         struct nodemap_cluster_roles_rec *ncrr = &nr->ncrr;
114
115         memset(ncrr, 0, sizeof(struct nodemap_cluster_roles_rec));
116         ncrr->ncrr_roles = cpu_to_le64(nodemap->nmf_rbac);
117 }
118
119 static void nodemap_idmap_key_init(struct nodemap_key *nk, unsigned int nm_id,
120                                    enum nodemap_id_type id_type,
121                                    u32 id_client)
122 {
123         enum nodemap_idx_type idx_type;
124
125         if (id_type == NODEMAP_UID)
126                 idx_type = NODEMAP_UIDMAP_IDX;
127         else if (id_type == NODEMAP_GID)
128                 idx_type = NODEMAP_GIDMAP_IDX;
129         else if (id_type == NODEMAP_PROJID)
130                 idx_type = NODEMAP_PROJIDMAP_IDX;
131         else
132                 idx_type = NODEMAP_EMPTY_IDX;
133
134         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id, idx_type));
135         nk->nk_id_client = cpu_to_le32(id_client);
136 }
137
138 static void nodemap_idmap_rec_init(union nodemap_rec *nr, u32 id_fs)
139 {
140         nr->nir.nir_id_fs = cpu_to_le32(id_fs);
141 }
142
143 static void nodemap_range_key_init(struct nodemap_key *nk,
144                                    enum nodemap_idx_type type,
145                                    unsigned int nm_id, unsigned int rn_id)
146 {
147         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id, type));
148         nk->nk_range_id = cpu_to_le32(rn_id);
149 }
150
151 static int nodemap_range_rec_init(union nodemap_rec *nr,
152                                   const struct lu_nid_range *range)
153 {
154         if (range->rn_netmask) {
155                 nr->nrr2.nrr_nid_prefix = range->rn_start;
156                 nr->nrr2.nrr_netmask = range->rn_netmask;
157
158                 if (NID_BYTES(&nr->nrr2.nrr_nid_prefix) >
159                     sizeof(struct lnet_nid))
160                         return -E2BIG;
161         } else {
162                 lnet_nid_t nid4[2];
163
164                 if (!nid_is_nid4(&range->rn_start) ||
165                     !nid_is_nid4(&range->rn_end))
166                         return -EINVAL;
167
168                 nid4[0] = lnet_nid_to_nid4(&range->rn_start);
169                 nid4[1] = lnet_nid_to_nid4(&range->rn_end);
170                 nr->nrr.nrr_start_nid = cpu_to_le64(nid4[0]);
171                 nr->nrr.nrr_end_nid = cpu_to_le64(nid4[1]);
172         }
173
174         return 0;
175 }
176
177 static void nodemap_global_key_init(struct nodemap_key *nk)
178 {
179         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(0, NODEMAP_GLOBAL_IDX));
180         nk->nk_unused = 0;
181 }
182
183 static void nodemap_global_rec_init(union nodemap_rec *nr, bool active)
184 {
185         nr->ngr.ngr_is_active = active;
186 }
187
188 /* should be called with dt_write lock */
189 static void nodemap_inc_version(const struct lu_env *env,
190                                 struct dt_object *nodemap_idx,
191                                 struct thandle *th)
192 {
193         u64 ver = dt_version_get(env, nodemap_idx);
194         dt_version_set(env, nodemap_idx, ver + 1, th);
195 }
196
197 enum ncfc_find_create {
198         NCFC_CREATE_NEW = 1,
199 };
200
201 static struct dt_object *nodemap_cache_find_create(const struct lu_env *env,
202                                                    struct dt_device *dev,
203                                                    struct local_oid_storage *los,
204                                                    enum ncfc_find_create create_new)
205 {
206         struct lu_fid tfid;
207         struct dt_object *root_obj;
208         struct dt_object *nm_obj;
209         int rc = 0;
210
211         rc = dt_root_get(env, dev, &tfid);
212         if (rc < 0)
213                 GOTO(out, nm_obj = ERR_PTR(rc));
214
215         root_obj = dt_locate(env, dev, &tfid);
216         if (unlikely(IS_ERR(root_obj)))
217                 GOTO(out, nm_obj = root_obj);
218
219         rc = dt_lookup_dir(env, root_obj, LUSTRE_NODEMAP_NAME, &tfid);
220         if (rc == -ENOENT) {
221                 if (dev->dd_rdonly)
222                         GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
223         } else if (rc) {
224                 GOTO(out_root, nm_obj = ERR_PTR(rc));
225         } else if (dev->dd_rdonly && create_new == NCFC_CREATE_NEW) {
226                 GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
227         }
228
229 again:
230         /* if loading index fails the first time, create new index */
231         if (create_new == NCFC_CREATE_NEW && rc != -ENOENT) {
232                 CDEBUG(D_INFO, "removing old index, creating new one\n");
233                 rc = local_object_unlink(env, dev, root_obj,
234                                          LUSTRE_NODEMAP_NAME);
235                 if (rc < 0) {
236                         /* XXX not sure the best way to get obd name. */
237                         CERROR("cannot destroy nodemap index: rc = %d\n",
238                                rc);
239                         GOTO(out_root, nm_obj = ERR_PTR(rc));
240                 }
241         }
242
243         nm_obj = local_index_find_or_create(env, los, root_obj,
244                                                 LUSTRE_NODEMAP_NAME,
245                                                 S_IFREG | S_IRUGO | S_IWUSR,
246                                                 &dt_nodemap_features);
247         if (IS_ERR(nm_obj))
248                 GOTO(out_root, nm_obj);
249
250         if (nm_obj->do_index_ops == NULL) {
251                 rc = nm_obj->do_ops->do_index_try(env, nm_obj,
252                                                       &dt_nodemap_features);
253                 /* even if loading from tgt fails, connecting to MGS will
254                  * rewrite the config
255                  */
256                 if (rc < 0) {
257                         dt_object_put(env, nm_obj);
258
259                         if (create_new == NCFC_CREATE_NEW)
260                                 GOTO(out_root, nm_obj = ERR_PTR(rc));
261
262                         CERROR("cannot load nodemap index from disk, creating "
263                                "new index: rc = %d\n", rc);
264                         create_new = NCFC_CREATE_NEW;
265                         goto again;
266                 }
267         }
268
269 out_root:
270         dt_object_put(env, root_obj);
271 out:
272         return nm_obj;
273 }
274
275 static int nodemap_idx_insert(const struct lu_env *env,
276                               struct dt_object *idx,
277                               const struct nodemap_key *nk,
278                               const union nodemap_rec *nr)
279 {
280         struct thandle *th;
281         struct dt_device *dev = lu2dt_dev(idx->do_lu.lo_dev);
282         int rc;
283
284         BUILD_BUG_ON(sizeof(union nodemap_rec) != 32);
285
286         th = dt_trans_create(env, dev);
287
288         if (IS_ERR(th))
289                 GOTO(out, rc = PTR_ERR(th));
290
291         rc = dt_declare_insert(env, idx,
292                                (const struct dt_rec *)nr,
293                                (const struct dt_key *)nk, th);
294         if (rc != 0)
295                 GOTO(out, rc);
296
297         rc = dt_declare_version_set(env, idx, th);
298         if (rc != 0)
299                 GOTO(out, rc);
300
301         rc = dt_trans_start_local(env, dev, th);
302         if (rc != 0)
303                 GOTO(out, rc);
304
305         dt_write_lock(env, idx, 0);
306
307         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
308                        (const struct dt_key *)nk, th);
309
310         nodemap_inc_version(env, idx, th);
311         dt_write_unlock(env, idx);
312 out:
313         dt_trans_stop(env, dev, th);
314
315         return rc;
316 }
317
318 static int nodemap_idx_update(const struct lu_env *env,
319                               struct dt_object *idx,
320                               const struct nodemap_key *nk,
321                               const union nodemap_rec *nr)
322 {
323         struct thandle          *th;
324         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
325         int                      rc = 0;
326
327         th = dt_trans_create(env, dev);
328
329         if (IS_ERR(th))
330                 GOTO(out, rc = PTR_ERR(th));
331
332         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
333         if (rc != 0)
334                 GOTO(out, rc);
335
336         rc = dt_declare_insert(env, idx, (const struct dt_rec *)nr,
337                                (const struct dt_key *)nk, th);
338         if (rc != 0)
339                 GOTO(out, rc);
340
341         rc = dt_declare_version_set(env, idx, th);
342         if (rc != 0)
343                 GOTO(out, rc);
344
345         rc = dt_trans_start_local(env, dev, th);
346         if (rc != 0)
347                 GOTO(out, rc);
348
349         dt_write_lock(env, idx, 0);
350
351         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
352         if (rc != 0)
353                 GOTO(out_lock, rc);
354
355         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
356                        (const struct dt_key *)nk, th);
357         if (rc != 0)
358                 GOTO(out_lock, rc);
359
360         nodemap_inc_version(env, idx, th);
361 out_lock:
362         dt_write_unlock(env, idx);
363 out:
364         dt_trans_stop(env, dev, th);
365
366         return rc;
367 }
368
369 static int nodemap_idx_delete(const struct lu_env *env,
370                               struct dt_object *idx,
371                               const struct nodemap_key *nk,
372                               const union nodemap_rec *unused)
373 {
374         struct thandle          *th;
375         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
376         int                      rc = 0;
377
378         th = dt_trans_create(env, dev);
379
380         if (IS_ERR(th))
381                 GOTO(out, rc = PTR_ERR(th));
382
383         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
384         if (rc != 0)
385                 GOTO(out, rc);
386
387         rc = dt_declare_version_set(env, idx, th);
388         if (rc != 0)
389                 GOTO(out, rc);
390
391         rc = dt_trans_start_local(env, dev, th);
392         if (rc != 0)
393                 GOTO(out, rc);
394
395         dt_write_lock(env, idx, 0);
396
397         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
398
399         nodemap_inc_version(env, idx, th);
400
401         dt_write_unlock(env, idx);
402 out:
403         dt_trans_stop(env, dev, th);
404
405         return rc;
406 }
407
408 enum nm_add_update {
409         NM_ADD = 0,
410         NM_UPDATE = 1,
411 };
412
413 static int nodemap_idx_cluster_add_update(const struct lu_nodemap *nodemap,
414                                           struct dt_object *idx,
415                                           enum nm_add_update update,
416                                           enum nodemap_cluster_rec_subid subid)
417 {
418         struct nodemap_key nk;
419         union nodemap_rec nr;
420         struct lu_env env;
421         int rc = 0;
422
423         ENTRY;
424
425         if (idx == NULL) {
426                 if (nodemap_mgs_ncf == NULL) {
427                         CERROR("cannot add nodemap config to non-existing MGS.\n");
428                         return -EINVAL;
429                 }
430                 idx = nodemap_mgs_ncf->ncf_obj;
431         }
432
433         rc = lu_env_init(&env, LCT_LOCAL);
434         if (rc)
435                 RETURN(rc);
436
437         nodemap_cluster_key_init(&nk, nodemap->nm_id, subid);
438         switch (subid) {
439         case NODEMAP_CLUSTER_REC:
440                 nodemap_cluster_rec_init(&nr, nodemap);
441                 break;
442         case NODEMAP_CLUSTER_ROLES:
443                 nodemap_cluster_roles_rec_init(&nr, nodemap);
444                 break;
445         default:
446                 CWARN("%s: unknown subtype %u\n", nodemap->nm_name, subid);
447                 GOTO(fini, rc = -EINVAL);
448         }
449
450         if (update == NM_UPDATE)
451                 rc = nodemap_idx_update(&env, idx, &nk, &nr);
452         else
453                 rc = nodemap_idx_insert(&env, idx, &nk, &nr);
454
455 fini:
456         lu_env_fini(&env);
457         RETURN(rc);
458 }
459
460 int nodemap_idx_nodemap_add(const struct lu_nodemap *nodemap)
461 {
462         return nodemap_idx_cluster_add_update(nodemap, NULL,
463                                               NM_ADD, NODEMAP_CLUSTER_REC);
464 }
465
466 int nodemap_idx_nodemap_update(const struct lu_nodemap *nodemap)
467 {
468         return nodemap_idx_cluster_add_update(nodemap, NULL,
469                                               NM_UPDATE, NODEMAP_CLUSTER_REC);
470 }
471
472 int nodemap_idx_nodemap_del(const struct lu_nodemap *nodemap)
473 {
474         struct rb_root           root;
475         struct lu_idmap         *idmap;
476         struct lu_idmap         *temp;
477         struct lu_nid_range     *range;
478         struct lu_nid_range     *range_temp;
479         struct nodemap_key       nk;
480         struct lu_env            env;
481         int                      rc = 0;
482         int                      rc2 = 0;
483
484         ENTRY;
485         if (nodemap_mgs_ncf == NULL) {
486                 CERROR("cannot add nodemap config to non-existing MGS.\n");
487                 return -EINVAL;
488         }
489
490         rc = lu_env_init(&env, LCT_LOCAL);
491         if (rc != 0)
492                 RETURN(rc);
493
494         nodemap_cluster_key_init(&nk, nodemap->nm_id, NODEMAP_CLUSTER_ROLES);
495         rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
496         if (rc2 < 0 && rc2 != -ENOENT)
497                 rc = rc2;
498
499         root = nodemap->nm_fs_to_client_uidmap;
500         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
501                                                 id_fs_to_client) {
502                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
503                                        idmap->id_client);
504                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
505                                          &nk, NULL);
506                 if (rc2 < 0)
507                         rc = rc2;
508         }
509
510         root = nodemap->nm_client_to_fs_gidmap;
511         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
512                                                 id_client_to_fs) {
513                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
514                                        idmap->id_client);
515                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
516                                          &nk, NULL);
517                 if (rc2 < 0)
518                         rc = rc2;
519         }
520
521         root = nodemap->nm_client_to_fs_projidmap;
522         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
523                                                 id_client_to_fs) {
524                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_PROJID,
525                                        idmap->id_client);
526                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
527                                          &nk, NULL);
528                 if (rc2 < 0)
529                         rc = rc2;
530         }
531
532         list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
533                                  rn_list) {
534                 enum nodemap_idx_type type;
535
536                 type = range->rn_netmask ? NODEMAP_NID_MASK_IDX :
537                                            NODEMAP_RANGE_IDX;
538                 nodemap_range_key_init(&nk, type, nodemap->nm_id, range->rn_id);
539                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
540                                          &nk, NULL);
541                 if (rc2 < 0)
542                         rc = rc2;
543         }
544
545         nodemap_cluster_key_init(&nk, nodemap->nm_id, NODEMAP_CLUSTER_REC);
546         rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
547         if (rc2 < 0)
548                 rc = rc2;
549
550         lu_env_fini(&env);
551
552         RETURN(rc);
553 }
554
555 int nodemap_idx_cluster_roles_add(const struct lu_nodemap *nodemap)
556 {
557         return nodemap_idx_cluster_add_update(nodemap, NULL, NM_ADD,
558                                               NODEMAP_CLUSTER_ROLES);
559 }
560
561 int nodemap_idx_cluster_roles_update(const struct lu_nodemap *nodemap)
562 {
563         return nodemap_idx_cluster_add_update(nodemap, NULL, NM_UPDATE,
564                                               NODEMAP_CLUSTER_ROLES);
565 }
566
567 int nodemap_idx_cluster_roles_del(const struct lu_nodemap *nodemap)
568 {
569         struct nodemap_key nk;
570         struct lu_env env;
571         int rc = 0;
572
573         ENTRY;
574
575         if (nodemap_mgs_ncf == NULL) {
576                 CERROR("cannot add nodemap config to non-existing MGS.\n");
577                 return -EINVAL;
578         }
579
580         rc = lu_env_init(&env, LCT_LOCAL);
581         if (rc != 0)
582                 RETURN(rc);
583
584         nodemap_cluster_key_init(&nk, nodemap->nm_id, NODEMAP_CLUSTER_ROLES);
585         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
586
587         lu_env_fini(&env);
588         RETURN(rc);
589 }
590
591 int nodemap_idx_range_add(const struct lu_nid_range *range)
592 {
593         struct nodemap_key nk;
594         union nodemap_rec nr;
595         struct lu_env env;
596         int rc = 0;
597
598         ENTRY;
599         if (nodemap_mgs_ncf == NULL) {
600                 CERROR("cannot add nodemap config to non-existing MGS.\n");
601                 return -EINVAL;
602         }
603
604         rc = lu_env_init(&env, LCT_LOCAL);
605         if (rc != 0)
606                 RETURN(rc);
607
608         nodemap_range_key_init(&nk, range->rn_netmask ? NODEMAP_NID_MASK_IDX :
609                                                         NODEMAP_RANGE_IDX,
610                                range->rn_nodemap->nm_id, range->rn_id);
611         rc = nodemap_range_rec_init(&nr, range);
612         if (rc < 0)
613                 goto free_env;
614
615         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
616 free_env:
617         lu_env_fini(&env);
618
619         RETURN(rc);
620 }
621
622 int nodemap_idx_range_del(const struct lu_nid_range *range)
623 {
624         struct nodemap_key       nk;
625         struct lu_env            env;
626         int                      rc = 0;
627         ENTRY;
628
629         if (nodemap_mgs_ncf == NULL) {
630                 CERROR("cannot add nodemap config to non-existing MGS.\n");
631                 return -EINVAL;
632         }
633
634         rc = lu_env_init(&env, LCT_LOCAL);
635         if (rc != 0)
636                 RETURN(rc);
637
638         nodemap_range_key_init(&nk, range->rn_netmask ? NODEMAP_NID_MASK_IDX :
639                                                         NODEMAP_RANGE_IDX,
640                                range->rn_nodemap->nm_id, range->rn_id);
641         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
642         lu_env_fini(&env);
643
644         RETURN(rc);
645 }
646
647 int nodemap_idx_idmap_add(const struct lu_nodemap *nodemap,
648                           enum nodemap_id_type id_type,
649                           const u32 map[2])
650 {
651         struct nodemap_key       nk;
652         union nodemap_rec        nr;
653         struct lu_env            env;
654         int                      rc = 0;
655         ENTRY;
656
657         if (nodemap_mgs_ncf == NULL) {
658                 CERROR("cannot add nodemap config to non-existing MGS.\n");
659                 return -EINVAL;
660         }
661
662         rc = lu_env_init(&env, LCT_LOCAL);
663         if (rc != 0)
664                 RETURN(rc);
665
666         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
667         nodemap_idmap_rec_init(&nr, map[1]);
668
669         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
670         lu_env_fini(&env);
671
672         RETURN(rc);
673 }
674
675 int nodemap_idx_idmap_del(const struct lu_nodemap *nodemap,
676                           enum nodemap_id_type id_type,
677                           const u32 map[2])
678 {
679         struct nodemap_key       nk;
680         struct lu_env            env;
681         int                      rc = 0;
682         ENTRY;
683
684         if (nodemap_mgs_ncf == NULL) {
685                 CERROR("cannot add nodemap config to non-existing MGS.\n");
686                 return -EINVAL;
687         }
688
689         rc = lu_env_init(&env, LCT_LOCAL);
690         if (rc != 0)
691                 RETURN(rc);
692
693         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
694
695         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
696         lu_env_fini(&env);
697
698         RETURN(rc);
699 }
700
701 static int nodemap_idx_global_add_update(bool value, enum nm_add_update update)
702 {
703         struct nodemap_key       nk;
704         union nodemap_rec        nr;
705         struct lu_env            env;
706         int                      rc = 0;
707         ENTRY;
708
709         if (nodemap_mgs_ncf == NULL) {
710                 CERROR("cannot add nodemap config to non-existing MGS.\n");
711                 return -EINVAL;
712         }
713
714         rc = lu_env_init(&env, LCT_LOCAL);
715         if (rc != 0)
716                 RETURN(rc);
717
718         nodemap_global_key_init(&nk);
719         nodemap_global_rec_init(&nr, value);
720
721         if (update == NM_UPDATE)
722                 rc = nodemap_idx_update(&env, nodemap_mgs_ncf->ncf_obj,
723                                         &nk, &nr);
724         else
725                 rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj,
726                                         &nk, &nr);
727
728         lu_env_fini(&env);
729
730         RETURN(rc);
731 }
732
733 int nodemap_idx_nodemap_activate(bool value)
734 {
735         return nodemap_idx_global_add_update(value, NM_UPDATE);
736 }
737
738 static enum nodemap_idx_type nodemap_get_key_type(const struct nodemap_key *key)
739 {
740         u32                      nodemap_id;
741
742         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
743         return nm_idx_get_type(nodemap_id);
744 }
745
746 static int nodemap_get_key_subtype(const struct nodemap_key *key)
747 {
748         enum nodemap_idx_type type = nodemap_get_key_type(key);
749
750         return type == NODEMAP_CLUSTER_IDX ? key->nk_cluster_subid : -1;
751 }
752
753 static int nodemap_cluster_rec_helper(struct nodemap_config *config,
754                                       u32 nodemap_id,
755                                       const union nodemap_rec *rec,
756                                       struct lu_nodemap **recent_nodemap)
757 {
758         struct lu_nodemap *nodemap, *old_nm;
759         enum nm_flag_bits flags;
760         enum nm_flag2_bits flags2;
761
762         nodemap = cfs_hash_lookup(config->nmc_nodemap_hash, rec->ncr.ncr_name);
763         if (nodemap == NULL) {
764                 if (nodemap_id == LUSTRE_NODEMAP_DEFAULT_ID)
765                         nodemap = nodemap_create(rec->ncr.ncr_name, config, 1);
766                 else
767                         nodemap = nodemap_create(rec->ncr.ncr_name, config, 0);
768                 if (IS_ERR(nodemap))
769                         return PTR_ERR(nodemap);
770
771                 /* we need to override the local ID with the saved ID */
772                 nodemap->nm_id = nodemap_id;
773                 if (nodemap_id > config->nmc_nodemap_highest_id)
774                         config->nmc_nodemap_highest_id = nodemap_id;
775
776         } else if (nodemap->nm_id != nodemap_id) {
777                 nodemap_putref(nodemap);
778                 return -EINVAL;
779         }
780
781         nodemap->nm_squash_uid = le32_to_cpu(rec->ncr.ncr_squash_uid);
782         nodemap->nm_squash_gid = le32_to_cpu(rec->ncr.ncr_squash_gid);
783         nodemap->nm_squash_projid = le32_to_cpu(rec->ncr.ncr_squash_projid);
784
785         flags = rec->ncr.ncr_flags;
786         nodemap->nmf_allow_root_access = flags & NM_FL_ALLOW_ROOT_ACCESS;
787         nodemap->nmf_trust_client_ids = flags & NM_FL_TRUST_CLIENT_IDS;
788         nodemap->nmf_deny_unknown = flags & NM_FL_DENY_UNKNOWN;
789         nodemap->nmf_map_mode =
790                 (flags & NM_FL_MAP_UID ? NODEMAP_MAP_UID : 0) |
791                 (flags & NM_FL_MAP_GID ? NODEMAP_MAP_GID : 0) |
792                 (flags & NM_FL_MAP_PROJID ? NODEMAP_MAP_PROJID : 0);
793         if (nodemap->nmf_map_mode == NODEMAP_MAP_BOTH_LEGACY)
794                 nodemap->nmf_map_mode = NODEMAP_MAP_BOTH;
795         nodemap->nmf_enable_audit = flags & NM_FL_ENABLE_AUDIT;
796         nodemap->nmf_forbid_encryption = flags & NM_FL_FORBID_ENCRYPT;
797         flags2 = rec->ncr.ncr_flags2;
798         nodemap->nmf_readonly_mount = flags2 & NM_FL2_READONLY_MOUNT;
799         /* by default, and in the absence of cluster_roles, grant all roles */
800         nodemap->nmf_rbac = NODEMAP_RBAC_ALL;
801
802         /* The fileset should be saved otherwise it will be empty
803          * every time in case of "NODEMAP_CLUSTER_IDX".
804          */
805         mutex_lock(&active_config_lock);
806         old_nm = nodemap_lookup(rec->ncr.ncr_name);
807         if (!IS_ERR(old_nm) && old_nm->nm_fileset[0] != '\0')
808                 strlcpy(nodemap->nm_fileset, old_nm->nm_fileset,
809                         sizeof(nodemap->nm_fileset));
810         mutex_unlock(&active_config_lock);
811         if (!IS_ERR(old_nm))
812                 nodemap_putref(old_nm);
813
814         if (*recent_nodemap == NULL) {
815                 *recent_nodemap = nodemap;
816                 INIT_LIST_HEAD(&nodemap->nm_list);
817         } else {
818                 list_add(&nodemap->nm_list, &(*recent_nodemap)->nm_list);
819         }
820         nodemap_putref(nodemap);
821
822         return 0;
823 }
824
825 static int nodemap_cluster_roles_helper(struct lu_nodemap *nodemap,
826                                         const union nodemap_rec *rec)
827 {
828         nodemap->nmf_rbac = le64_to_cpu(rec->ncrr.ncrr_roles);
829
830         return 0;
831 }
832
833 /**
834  * Process a key/rec pair and modify the new configuration.
835  *
836  * \param       config          configuration to update with this key/rec data
837  * \param       key             key of the record that was loaded
838  * \param       rec             record that was loaded
839  * \param       recent_nodemap  last referenced nodemap
840  * \retval      type of record processed, see enum #nodemap_idx_type
841  * \retval      -ENOENT         range or map loaded before nodemap record
842  * \retval      -EINVAL         duplicate nodemap cluster records found with
843  *                              different IDs, or nodemap has invalid name
844  * \retval      -ENOMEM
845  */
846 static int nodemap_process_keyrec(struct nodemap_config *config,
847                                   const struct nodemap_key *key,
848                                   const union nodemap_rec *rec,
849                                   struct lu_nodemap **recent_nodemap)
850 {
851         struct lu_nodemap *nodemap = NULL;
852         enum nodemap_idx_type type;
853         enum nodemap_id_type id_type;
854         struct lnet_nid nid[2];
855         int subtype;
856         u32 nodemap_id;
857         u32 map[2];
858         int rc;
859
860         ENTRY;
861
862         BUILD_BUG_ON(sizeof(union nodemap_rec) != 32);
863
864         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
865         type = nodemap_get_key_type(key);
866         subtype = nodemap_get_key_subtype(key);
867         nodemap_id = nm_idx_set_type(nodemap_id, 0);
868
869         CDEBUG(D_INFO, "found config entry, nm_id %d type %d subtype %d\n",
870                nodemap_id, type, subtype);
871
872         /* find the correct nodemap in the load list */
873         if (type == NODEMAP_RANGE_IDX || type == NODEMAP_NID_MASK_IDX ||
874             type == NODEMAP_UIDMAP_IDX || type == NODEMAP_GIDMAP_IDX ||
875             type == NODEMAP_PROJIDMAP_IDX ||
876             (type == NODEMAP_CLUSTER_IDX && subtype != NODEMAP_CLUSTER_REC)) {
877                 struct lu_nodemap *tmp = NULL;
878
879                 nodemap = *recent_nodemap;
880
881                 if (nodemap == NULL)
882                         GOTO(out, rc = -ENOENT);
883
884                 if (nodemap->nm_id != nodemap_id) {
885                         list_for_each_entry(tmp, &nodemap->nm_list, nm_list)
886                                 if (tmp->nm_id == nodemap_id) {
887                                         nodemap = tmp;
888                                         break;
889                                 }
890
891                         if (nodemap->nm_id != nodemap_id)
892                                 GOTO(out, rc = -ENOENT);
893                 }
894
895                 /* update most recently used nodemap if necessay */
896                 if (nodemap != *recent_nodemap)
897                         *recent_nodemap = nodemap;
898         }
899
900         switch (type) {
901         case NODEMAP_EMPTY_IDX:
902                 if (nodemap_id != 0)
903                         CWARN("Found nodemap config record without type field, "
904                               " nodemap_id=%d. nodemap config file corrupt?\n",
905                               nodemap_id);
906                 break;
907         case NODEMAP_CLUSTER_IDX:
908                 switch (nodemap_get_key_subtype(key)) {
909                 case NODEMAP_CLUSTER_REC:
910                         rc = nodemap_cluster_rec_helper(config, nodemap_id, rec,
911                                                         recent_nodemap);
912                         if (rc != 0)
913                                 GOTO(out, rc);
914                         break;
915                 case NODEMAP_CLUSTER_ROLES:
916                         rc = nodemap_cluster_roles_helper(nodemap, rec);
917                         if (rc != 0)
918                                 GOTO(out, rc);
919                         break;
920                 default:
921                         CWARN("%s: ignoring keyrec of type %d with subtype %u\n",
922                               nodemap->nm_name, NODEMAP_CLUSTER_IDX,
923                               nodemap_get_key_subtype(key));
924                         break;
925                 }
926                 break;
927         case NODEMAP_RANGE_IDX:
928                 lnet_nid4_to_nid(le64_to_cpu(rec->nrr.nrr_start_nid), &nid[0]);
929                 lnet_nid4_to_nid(le64_to_cpu(rec->nrr.nrr_end_nid), &nid[1]);
930                 rc = nodemap_add_range_helper(config, nodemap, nid, 0,
931                                               le32_to_cpu(key->nk_range_id));
932                 if (rc != 0)
933                         GOTO(out, rc);
934                 break;
935         case NODEMAP_NID_MASK_IDX:
936                 nid[0] = rec->nrr2.nrr_nid_prefix;
937                 nid[1] = rec->nrr2.nrr_nid_prefix;
938                 rc = nodemap_add_range_helper(config, nodemap, nid,
939                                               rec->nrr2.nrr_netmask,
940                                               le32_to_cpu(key->nk_range_id));
941                 if (rc != 0)
942                         GOTO(out, rc);
943                 break;
944         case NODEMAP_UIDMAP_IDX:
945         case NODEMAP_GIDMAP_IDX:
946         case NODEMAP_PROJIDMAP_IDX:
947                 map[0] = le32_to_cpu(key->nk_id_client);
948                 map[1] = le32_to_cpu(rec->nir.nir_id_fs);
949
950                 if (type == NODEMAP_UIDMAP_IDX)
951                         id_type = NODEMAP_UID;
952                 else if (type == NODEMAP_GIDMAP_IDX)
953                         id_type = NODEMAP_GID;
954                 else if (type == NODEMAP_PROJIDMAP_IDX)
955                         id_type = NODEMAP_PROJID;
956                 else
957                         GOTO(out, rc = -EINVAL);
958
959                 rc = nodemap_add_idmap_helper(nodemap, id_type, map);
960                 if (rc != 0)
961                         GOTO(out, rc);
962                 break;
963         case NODEMAP_GLOBAL_IDX:
964                 switch (key->nk_unused) {
965                 case 0:
966                         config->nmc_nodemap_is_active = rec->ngr.ngr_is_active;
967                         break;
968                 default:
969                         CWARN("%s: ignoring keyrec of type %d with subtype %u\n",
970                               recent_nodemap ?
971                                (*recent_nodemap)->nm_name : "nodemap",
972                               NODEMAP_GLOBAL_IDX, key->nk_unused);
973                         break;
974                 }
975                 break;
976         default:
977                 CWARN("%s: ignoring key %u:%u for unknown type %u\n",
978                       recent_nodemap ? (*recent_nodemap)->nm_name : "nodemap",
979                       key->nk_nodemap_id & 0x0FFFFFFF, key->nk_unused, type);
980                 break;
981         }
982
983         rc = type;
984
985         EXIT;
986
987 out:
988         return rc;
989 }
990
991 enum nm_config_passes {
992         NM_READ_CLUSTERS = 0,
993         NM_READ_ATTRIBUTES = 1,
994 };
995
996 static int nodemap_load_entries(const struct lu_env *env,
997                                 struct dt_object *nodemap_idx)
998 {
999         const struct dt_it_ops *iops;
1000         struct dt_it *it;
1001         struct lu_nodemap *recent_nodemap = NULL;
1002         struct nodemap_config *new_config = NULL;
1003         u64 hash = 0;
1004         bool activate_nodemap = false;
1005         bool loaded_global_idx = false;
1006         enum nm_config_passes cur_pass = NM_READ_CLUSTERS;
1007         int rc = 0;
1008
1009         ENTRY;
1010
1011         iops = &nodemap_idx->do_index_ops->dio_it;
1012
1013         dt_read_lock(env, nodemap_idx, 0);
1014         it = iops->init(env, nodemap_idx, 0);
1015         if (IS_ERR(it))
1016                 GOTO(out, rc = PTR_ERR(it));
1017
1018         rc = iops->load(env, it, hash);
1019         if (rc < 0)
1020                 GOTO(out_iops_fini, rc);
1021
1022         /* rc == 0 means we need to advance to record */
1023         if (rc == 0) {
1024                 rc = iops->next(env, it);
1025
1026                 if (rc < 0)
1027                         GOTO(out_iops_put, rc);
1028                 /* rc > 0 is eof, will be checked in while below */
1029         } else {
1030                 /* rc == 1, we found initial record and can process below */
1031                 rc = 0;
1032         }
1033
1034         new_config = nodemap_config_alloc();
1035         if (IS_ERR(new_config)) {
1036                 rc = PTR_ERR(new_config);
1037                 new_config = NULL;
1038                 GOTO(out_iops_put, rc);
1039         }
1040
1041         /* rc > 0 is eof, check initial iops->next here as well */
1042         while (rc == 0) {
1043                 struct nodemap_key *key;
1044                 union nodemap_rec rec;
1045                 enum nodemap_idx_type key_type;
1046                 int sub_type;
1047
1048                 key = (struct nodemap_key *)iops->key(env, it);
1049                 key_type = nodemap_get_key_type((struct nodemap_key *)key);
1050                 sub_type = nodemap_get_key_subtype((struct nodemap_key *)key);
1051                 if ((cur_pass == NM_READ_CLUSTERS &&
1052                      key_type == NODEMAP_CLUSTER_IDX &&
1053                      sub_type == NODEMAP_CLUSTER_REC) ||
1054                     (cur_pass == NM_READ_ATTRIBUTES &&
1055                      (key_type != NODEMAP_CLUSTER_IDX ||
1056                       sub_type != NODEMAP_CLUSTER_REC) &&
1057                      key_type != NODEMAP_EMPTY_IDX)) {
1058                         rc = iops->rec(env, it, (struct dt_rec *)&rec, 0);
1059                         if (rc != -ESTALE) {
1060                                 if (rc != 0)
1061                                         GOTO(out_nodemap_config, rc);
1062                                 rc = nodemap_process_keyrec(new_config, key, &rec,
1063                                                             &recent_nodemap);
1064                                 if (rc < 0)
1065                                         GOTO(out_nodemap_config, rc);
1066                                 if (rc == NODEMAP_GLOBAL_IDX)
1067                                         loaded_global_idx = true;
1068                         }
1069                 }
1070
1071                 do
1072                         rc = iops->next(env, it);
1073                 while (rc == -ESTALE);
1074
1075                 /* move to second pass */
1076                 if (rc > 0 && cur_pass == NM_READ_CLUSTERS) {
1077                         cur_pass = NM_READ_ATTRIBUTES;
1078                         rc = iops->load(env, it, 0);
1079                         if (rc == 0)
1080                                 rc = iops->next(env, it);
1081                         else if (rc > 0)
1082                                 rc = 0;
1083                         else
1084                                 GOTO(out, rc);
1085                 }
1086         }
1087
1088         if (rc > 0)
1089                 rc = 0;
1090
1091 out_nodemap_config:
1092         if (rc != 0)
1093                 nodemap_config_dealloc(new_config);
1094         else
1095                 /* creating new default needs to be done outside dt read lock */
1096                 activate_nodemap = true;
1097 out_iops_put:
1098         iops->put(env, it);
1099 out_iops_fini:
1100         iops->fini(env, it);
1101 out:
1102         dt_read_unlock(env, nodemap_idx);
1103
1104         if (rc != 0)
1105                 CWARN("%s: failed to load nodemap configuration: rc = %d\n",
1106                       nodemap_idx->do_lu.lo_dev->ld_obd->obd_name, rc);
1107
1108         if (!activate_nodemap)
1109                 RETURN(rc);
1110
1111         if (new_config->nmc_default_nodemap == NULL) {
1112                 /* new MGS won't have a default nm on disk, so create it here */
1113                 struct lu_nodemap *nodemap =
1114                         nodemap_create(DEFAULT_NODEMAP, new_config, 1);
1115                 if (IS_ERR(nodemap)) {
1116                         rc = PTR_ERR(nodemap);
1117                 } else {
1118                         rc = nodemap_idx_cluster_add_update(
1119                                         new_config->nmc_default_nodemap,
1120                                         nodemap_idx,
1121                                         NM_ADD, NODEMAP_CLUSTER_REC);
1122                         nodemap_putref(new_config->nmc_default_nodemap);
1123                 }
1124         }
1125
1126         /* new nodemap config won't have an active/inactive record */
1127         if (rc == 0 && loaded_global_idx == false) {
1128                 struct nodemap_key       nk;
1129                 union nodemap_rec        nr;
1130
1131                 nodemap_global_key_init(&nk);
1132                 nodemap_global_rec_init(&nr, false);
1133                 rc = nodemap_idx_insert(env, nodemap_idx, &nk, &nr);
1134         }
1135
1136         if (rc == 0)
1137                 nodemap_config_set_active(new_config);
1138         else
1139                 nodemap_config_dealloc(new_config);
1140
1141         RETURN(rc);
1142 }
1143
1144 /**
1145  * Step through active config and write to disk.
1146  */
1147 static struct dt_object *
1148 nodemap_save_config_cache(const struct lu_env *env,
1149                           struct dt_device *dev,
1150                           struct local_oid_storage *los)
1151 {
1152         struct dt_object *o;
1153         struct lu_nodemap *nodemap;
1154         struct lu_nodemap *nm_tmp;
1155         struct lu_nid_range *range;
1156         struct lu_nid_range *range_temp;
1157         struct lu_idmap *idmap;
1158         struct lu_idmap *id_tmp;
1159         struct rb_root root;
1160         struct nodemap_key nk;
1161         union nodemap_rec nr;
1162         LIST_HEAD(nodemap_list_head);
1163         int rc = 0, rc2;
1164
1165         ENTRY;
1166
1167         /* create a new index file to fill with active config */
1168         o = nodemap_cache_find_create(env, dev, los, NCFC_CREATE_NEW);
1169         if (IS_ERR(o))
1170                 RETURN(o);
1171
1172         mutex_lock(&active_config_lock);
1173
1174         /* convert hash to list so we don't spin */
1175         cfs_hash_for_each_safe(active_config->nmc_nodemap_hash,
1176                                nm_hash_list_cb, &nodemap_list_head);
1177
1178         list_for_each_entry_safe(nodemap, nm_tmp, &nodemap_list_head, nm_list) {
1179                 nodemap_cluster_key_init(&nk, nodemap->nm_id,
1180                                          NODEMAP_CLUSTER_REC);
1181                 nodemap_cluster_rec_init(&nr, nodemap);
1182
1183                 rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1184                 if (rc2 < 0) {
1185                         rc = rc2;
1186                         continue;
1187                 }
1188
1189                 /* only insert NODEMAP_CLUSTER_ROLES idx in saved config cache
1190                  * if nmf_rbac is not default value NODEMAP_RBAC_ALL
1191                  */
1192                 if (nodemap->nmf_rbac != NODEMAP_RBAC_ALL) {
1193                         nodemap_cluster_key_init(&nk, nodemap->nm_id,
1194                                                  NODEMAP_CLUSTER_ROLES);
1195                         nodemap_cluster_roles_rec_init(&nr, nodemap);
1196                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1197                         if (rc2 < 0)
1198                                 rc = rc2;
1199                 }
1200
1201                 down_read(&active_config->nmc_range_tree_lock);
1202                 list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
1203                                          rn_list) {
1204                         enum nodemap_idx_type type;
1205
1206                         type = range->rn_netmask ? NODEMAP_NID_MASK_IDX :
1207                                                    NODEMAP_RANGE_IDX;
1208                         nodemap_range_key_init(&nk, type, nodemap->nm_id,
1209                                                range->rn_id);
1210                         rc2 = nodemap_range_rec_init(&nr, range);
1211                         if (rc2 < 0) {
1212                                 rc = rc2;
1213                                 continue;
1214                         }
1215                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1216                         if (rc2 < 0)
1217                                 rc = rc2;
1218                 }
1219                 up_read(&active_config->nmc_range_tree_lock);
1220
1221                 /* we don't need to take nm_idmap_lock because active config
1222                  * lock prevents changes from happening to nodemaps
1223                  */
1224                 root = nodemap->nm_client_to_fs_uidmap;
1225                 nm_rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1226                                                         id_client_to_fs) {
1227                         nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
1228                                                idmap->id_client);
1229                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1230                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1231                         if (rc2 < 0)
1232                                 rc = rc2;
1233                 }
1234
1235                 root = nodemap->nm_client_to_fs_gidmap;
1236                 nm_rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1237                                                         id_client_to_fs) {
1238                         nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
1239                                                idmap->id_client);
1240                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1241                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1242                         if (rc2 < 0)
1243                                 rc = rc2;
1244                 }
1245
1246                 root = nodemap->nm_client_to_fs_projidmap;
1247                 nm_rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1248                                                         id_client_to_fs) {
1249                         nodemap_idmap_key_init(&nk, nodemap->nm_id,
1250                                                NODEMAP_PROJID,
1251                                                idmap->id_client);
1252                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1253                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1254                         if (rc2 < 0)
1255                                 rc = rc2;
1256                 }
1257         }
1258         nodemap_global_key_init(&nk);
1259         nodemap_global_rec_init(&nr, active_config->nmc_nodemap_is_active);
1260         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1261         if (rc2 < 0)
1262                 rc = rc2;
1263
1264         mutex_unlock(&active_config_lock);
1265
1266         if (rc < 0) {
1267                 dt_object_put(env, o);
1268                 o = ERR_PTR(rc);
1269         }
1270
1271         RETURN(o);
1272 }
1273
1274 static void nodemap_save_all_caches(void)
1275 {
1276         struct nm_config_file   *ncf;
1277         struct lu_env            env;
1278         int                      rc = 0;
1279
1280         /* recreating nodemap cache requires fld_thread_key be in env */
1281         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD);
1282         if (rc != 0) {
1283                 CWARN("cannot init env for nodemap config: rc = %d\n", rc);
1284                 return;
1285         }
1286
1287         mutex_lock(&ncf_list_lock);
1288         list_for_each_entry(ncf, &ncf_list_head, ncf_list) {
1289                 struct dt_device *dev = lu2dt_dev(ncf->ncf_obj->do_lu.lo_dev);
1290                 struct obd_device *obd = ncf->ncf_obj->do_lu.lo_dev->ld_obd;
1291                 struct dt_object *o;
1292
1293                 /* put current config file so save conf can rewrite it */
1294                 dt_object_put_nocache(&env, ncf->ncf_obj);
1295                 ncf->ncf_obj = NULL;
1296
1297                 o = nodemap_save_config_cache(&env, dev, ncf->ncf_los);
1298                 if (IS_ERR(o))
1299                         CWARN("%s: error writing to nodemap config: rc = %d\n",
1300                               obd->obd_name, rc);
1301                 else
1302                         ncf->ncf_obj = o;
1303         }
1304         mutex_unlock(&ncf_list_lock);
1305
1306         lu_env_fini(&env);
1307 }
1308
1309 /* tracks if config still needs to be loaded, either from disk or network */
1310 static bool nodemap_config_loaded;
1311 static DEFINE_MUTEX(nodemap_config_loaded_lock);
1312
1313 /**
1314  * Ensures that configs loaded over the wire are prioritized over those loaded
1315  * from disk.
1316  *
1317  * \param config        config to set as the active config
1318  */
1319 void nodemap_config_set_active_mgc(struct nodemap_config *config)
1320 {
1321         mutex_lock(&nodemap_config_loaded_lock);
1322         nodemap_config_set_active(config);
1323         nodemap_config_loaded = true;
1324         nodemap_save_all_caches();
1325         mutex_unlock(&nodemap_config_loaded_lock);
1326 }
1327 EXPORT_SYMBOL(nodemap_config_set_active_mgc);
1328
1329 /**
1330  * Register a dt_object representing the config index file. This should be
1331  * called by targets in order to load the nodemap configuration from disk. The
1332  * dt_object should be created with local_index_find_or_create and the index
1333  * features should be enabled with do_index_try.
1334  *
1335  * \param obj   dt_object returned by local_index_find_or_create
1336  *
1337  * \retval      on success: nm_config_file handle for later deregistration
1338  * \retval      -ENOMEM         memory allocation failure
1339  * \retval      -ENOENT         error loading nodemap config
1340  * \retval      -EINVAL         error loading nodemap config
1341  * \retval      -EEXIST         nodemap config already registered for MGS
1342  */
1343 struct nm_config_file *nm_config_file_register_mgs(const struct lu_env *env,
1344                                                    struct dt_object *obj,
1345                                                    struct local_oid_storage *los)
1346 {
1347         struct nm_config_file *ncf;
1348         int rc = 0;
1349         ENTRY;
1350
1351         if (nodemap_mgs_ncf != NULL)
1352                 GOTO(out, ncf = ERR_PTR(-EEXIST));
1353
1354         OBD_ALLOC_PTR(ncf);
1355         if (ncf == NULL)
1356                 GOTO(out, ncf = ERR_PTR(-ENOMEM));
1357
1358         /* if loading from cache, prevent activation of MGS config until cache
1359          * loading is done, so disk config is overwritten by MGS config.
1360          */
1361         mutex_lock(&nodemap_config_loaded_lock);
1362         rc = nodemap_load_entries(env, obj);
1363         if (!rc)
1364                 nodemap_config_loaded = true;
1365         mutex_unlock(&nodemap_config_loaded_lock);
1366
1367         if (rc) {
1368                 OBD_FREE_PTR(ncf);
1369                 GOTO(out, ncf = ERR_PTR(rc));
1370         }
1371
1372         lu_object_get(&obj->do_lu);
1373
1374         ncf->ncf_obj = obj;
1375         ncf->ncf_los = los;
1376
1377         nodemap_mgs_ncf = ncf;
1378
1379 out:
1380         return ncf;
1381 }
1382 EXPORT_SYMBOL(nm_config_file_register_mgs);
1383
1384 struct nm_config_file *nm_config_file_register_tgt(const struct lu_env *env,
1385                                                    struct dt_device *dev,
1386                                                    struct local_oid_storage *los)
1387 {
1388         struct nm_config_file *ncf;
1389         struct dt_object *config_obj = NULL;
1390         int rc = 0;
1391
1392         OBD_ALLOC_PTR(ncf);
1393         if (ncf == NULL)
1394                 RETURN(ERR_PTR(-ENOMEM));
1395
1396         /* don't load from cache if config already loaded */
1397         mutex_lock(&nodemap_config_loaded_lock);
1398         if (!nodemap_config_loaded) {
1399                 config_obj = nodemap_cache_find_create(env, dev, los, 0);
1400                 if (IS_ERR(config_obj))
1401                         rc = PTR_ERR(config_obj);
1402                 else
1403                         rc = nodemap_load_entries(env, config_obj);
1404
1405                 if (!rc)
1406                         nodemap_config_loaded = true;
1407         }
1408         mutex_unlock(&nodemap_config_loaded_lock);
1409         if (rc)
1410                 GOTO(out_ncf, rc);
1411
1412         /* sync on disk caches w/ loaded config in memory, ncf_obj may change */
1413         if (!config_obj) {
1414                 config_obj = nodemap_save_config_cache(env, dev, los);
1415                 if (IS_ERR(config_obj))
1416                         GOTO(out_ncf, rc = PTR_ERR(config_obj));
1417         }
1418
1419         ncf->ncf_obj = config_obj;
1420         ncf->ncf_los = los;
1421
1422         mutex_lock(&ncf_list_lock);
1423         list_add(&ncf->ncf_list, &ncf_list_head);
1424         mutex_unlock(&ncf_list_lock);
1425
1426 out_ncf:
1427         if (rc) {
1428                 OBD_FREE_PTR(ncf);
1429                 RETURN(ERR_PTR(rc));
1430         }
1431
1432         RETURN(ncf);
1433 }
1434 EXPORT_SYMBOL(nm_config_file_register_tgt);
1435
1436 /**
1437  * Deregister a nm_config_file. Should be called by targets during cleanup.
1438  *
1439  * \param ncf   config file to deregister
1440  */
1441 void nm_config_file_deregister_mgs(const struct lu_env *env,
1442                                    struct nm_config_file *ncf)
1443 {
1444         ENTRY;
1445         LASSERT(nodemap_mgs_ncf == ncf);
1446
1447         nodemap_mgs_ncf = NULL;
1448         if (ncf->ncf_obj)
1449                 dt_object_put(env, ncf->ncf_obj);
1450
1451         OBD_FREE_PTR(ncf);
1452
1453         EXIT;
1454 }
1455 EXPORT_SYMBOL(nm_config_file_deregister_mgs);
1456
1457 void nm_config_file_deregister_tgt(const struct lu_env *env,
1458                                    struct nm_config_file *ncf)
1459 {
1460         ENTRY;
1461
1462         if (ncf == NULL)
1463                 return;
1464
1465         mutex_lock(&ncf_list_lock);
1466         list_del(&ncf->ncf_list);
1467         mutex_unlock(&ncf_list_lock);
1468
1469         if (ncf->ncf_obj)
1470                 dt_object_put(env, ncf->ncf_obj);
1471
1472         OBD_FREE_PTR(ncf);
1473
1474         EXIT;
1475 }
1476 EXPORT_SYMBOL(nm_config_file_deregister_tgt);
1477
1478 int nodemap_process_idx_pages(struct nodemap_config *config, union lu_page *lip,
1479                               struct lu_nodemap **recent_nodemap)
1480 {
1481         struct nodemap_key *key;
1482         union nodemap_rec *rec;
1483         char *entry;
1484         int j;
1485         int k;
1486         int rc = 0;
1487         int size = dt_nodemap_features.dif_keysize_max +
1488                    dt_nodemap_features.dif_recsize_max;
1489         ENTRY;
1490
1491         for (j = 0; j < LU_PAGE_COUNT; j++) {
1492                 if (lip->lp_idx.lip_magic != LIP_MAGIC)
1493                         return -EINVAL;
1494
1495                 /* get and process keys and records from page */
1496                 for (k = 0; k < lip->lp_idx.lip_nr; k++) {
1497                         entry = lip->lp_idx.lip_entries + k * size;
1498                         key = (struct nodemap_key *)entry;
1499
1500                         entry += dt_nodemap_features.dif_keysize_max;
1501                         rec = (union nodemap_rec *)entry;
1502
1503                         rc = nodemap_process_keyrec(config, key, rec,
1504                                                     recent_nodemap);
1505                         if (rc < 0)
1506                                 return rc;
1507                 }
1508                 lip++;
1509         }
1510
1511         EXIT;
1512         return 0;
1513 }
1514 EXPORT_SYMBOL(nodemap_process_idx_pages);
1515
1516 static int nodemap_page_build(const struct lu_env *env, struct dt_object *obj,
1517                               union lu_page *lp, size_t bytes,
1518                               const struct dt_it_ops *iops,
1519                               struct dt_it *it, __u32 attr, void *arg)
1520 {
1521         struct idx_info *ii = (struct idx_info *)arg;
1522         struct lu_idxpage *lip = &lp->lp_idx;
1523         char *entry;
1524         size_t size = ii->ii_keysize + ii->ii_recsize;
1525         int rc;
1526         ENTRY;
1527
1528         if (bytes < LIP_HDR_SIZE)
1529                 return -EINVAL;
1530
1531         /* initialize the header of the new container */
1532         memset(lip, 0, LIP_HDR_SIZE);
1533         lip->lip_magic = LIP_MAGIC;
1534         bytes -= LIP_HDR_SIZE;
1535
1536         entry = lip->lip_entries;
1537         do {
1538                 char *tmp_entry = entry;
1539                 struct dt_key *key;
1540                 __u64 hash;
1541                 enum nodemap_idx_type key_type;
1542                 int sub_type;
1543
1544                 /* fetch 64-bit hash value */
1545                 hash = iops->store(env, it);
1546                 ii->ii_hash_end = hash;
1547
1548                 if (CFS_FAIL_CHECK(OBD_FAIL_OBD_IDX_READ_BREAK)) {
1549                         if (lip->lip_nr != 0)
1550                                 GOTO(out, rc = 0);
1551                 }
1552
1553                 if (bytes < size) {
1554                         if (lip->lip_nr == 0)
1555                                 GOTO(out, rc = -EINVAL);
1556                         GOTO(out, rc = 0);
1557                 }
1558
1559                 key = iops->key(env, it);
1560                 key_type = nodemap_get_key_type((struct nodemap_key *)key);
1561                 sub_type = nodemap_get_key_subtype((struct nodemap_key *)key);
1562
1563                 /* on the first pass, get only the cluster types. On second
1564                  * pass, get all the rest */
1565                 if ((ii->ii_attrs == NM_READ_CLUSTERS &&
1566                      key_type == NODEMAP_CLUSTER_IDX &&
1567                      sub_type == NODEMAP_CLUSTER_REC) ||
1568                     (ii->ii_attrs == NM_READ_ATTRIBUTES &&
1569                      (key_type != NODEMAP_CLUSTER_IDX ||
1570                       sub_type != NODEMAP_CLUSTER_REC) &&
1571                      key_type != NODEMAP_EMPTY_IDX)) {
1572                         memcpy(tmp_entry, key, ii->ii_keysize);
1573                         tmp_entry += ii->ii_keysize;
1574
1575                         /* and finally the record */
1576                         rc = iops->rec(env, it, (struct dt_rec *)tmp_entry,
1577                                        attr);
1578                         if (rc != -ESTALE) {
1579                                 if (rc != 0)
1580                                         GOTO(out, rc);
1581
1582                                 /* hash/key/record successfully copied! */
1583                                 lip->lip_nr++;
1584                                 if (unlikely(lip->lip_nr == 1 &&
1585                                     ii->ii_count == 0))
1586                                         ii->ii_hash_start = hash;
1587
1588                                 entry = tmp_entry + ii->ii_recsize;
1589                                 bytes -= size;
1590                         }
1591                 }
1592
1593                 /* move on to the next record */
1594                 do {
1595                         rc = iops->next(env, it);
1596                 } while (rc == -ESTALE);
1597
1598                 /* move to second pass */
1599                 if (rc > 0 && ii->ii_attrs == NM_READ_CLUSTERS) {
1600                         ii->ii_attrs = NM_READ_ATTRIBUTES;
1601                         rc = iops->load(env, it, 0);
1602                         if (rc == 0)
1603                                 rc = iops->next(env, it);
1604                         else if (rc > 0)
1605                                 rc = 0;
1606                         else
1607                                 GOTO(out, rc);
1608                 }
1609
1610         } while (rc == 0);
1611
1612         GOTO(out, rc);
1613 out:
1614         if (rc >= 0 && lip->lip_nr > 0)
1615                 /* one more container */
1616                 ii->ii_count++;
1617         if (rc > 0)
1618                 /* no more entries */
1619                 ii->ii_hash_end = II_END_OFF;
1620         return rc;
1621 }
1622
1623 int nodemap_index_read(struct lu_env *env,
1624                        struct nm_config_file *ncf,
1625                        struct idx_info *ii,
1626                        const struct lu_rdpg *rdpg)
1627 {
1628         struct dt_object        *nodemap_idx = ncf->ncf_obj;
1629         __u64                    version;
1630         int                      rc = 0;
1631
1632         ii->ii_keysize = dt_nodemap_features.dif_keysize_max;
1633         ii->ii_recsize = dt_nodemap_features.dif_recsize_max;
1634
1635         dt_read_lock(env, nodemap_idx, 0);
1636         version = dt_version_get(env, nodemap_idx);
1637         if (rdpg->rp_hash != 0 && ii->ii_version != version) {
1638                 CDEBUG(D_INFO, "nodemap config changed inflight, old %llu, new %llu\n",
1639                        ii->ii_version,
1640                        version);
1641                 ii->ii_hash_end = 0;
1642         } else {
1643                 rc = dt_index_walk(env, nodemap_idx, rdpg, nodemap_page_build,
1644                                    ii);
1645                 CDEBUG(D_INFO, "walked index, hashend %llx\n", ii->ii_hash_end);
1646         }
1647
1648         if (rc >= 0)
1649                 ii->ii_version = version;
1650
1651         /*
1652          * For partial lu_idxpage filling of the end system page,
1653          * init the header of the remain lu_idxpages.
1654          */
1655         if (rc > 0)
1656                 dt_index_page_adjust(rdpg->rp_pages, rdpg->rp_npages,
1657                                      ii->ii_count);
1658
1659         dt_read_unlock(env, nodemap_idx);
1660         return rc;
1661 }
1662 EXPORT_SYMBOL(nodemap_index_read);
1663
1664 /**
1665  * Returns the current nodemap configuration to MGC by walking the nodemap
1666  * config index and storing it in the response buffer.
1667  *
1668  * \param       req             incoming MGS_CONFIG_READ request
1669  * \retval      0               success
1670  * \retval      -EINVAL         malformed request
1671  * \retval      -ENOTCONN       client evicted/reconnected already
1672  * \retval      -ETIMEDOUT      client timeout or network error
1673  * \retval      -ENOMEM
1674  */
1675 int nodemap_get_config_req(struct obd_device *mgs_obd,
1676                            struct ptlrpc_request *req)
1677 {
1678         const struct ptlrpc_bulk_frag_ops *frag_ops = &ptlrpc_bulk_kiov_pin_ops;
1679         struct mgs_config_body *body;
1680         struct mgs_config_res *res;
1681         struct lu_rdpg rdpg;
1682         struct idx_info nodemap_ii;
1683         struct ptlrpc_bulk_desc *desc;
1684         struct tg_export_data *rqexp_ted = &req->rq_export->exp_target_data;
1685         int i;
1686         int page_count;
1687         int bytes = 0;
1688         int rc = 0;
1689
1690         body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
1691         if (!body)
1692                 RETURN(-EINVAL);
1693
1694         if (body->mcb_type != MGS_CFG_T_NODEMAP)
1695                 RETURN(-EINVAL);
1696
1697         rdpg.rp_count = (body->mcb_units << body->mcb_bits);
1698         rdpg.rp_npages = (rdpg.rp_count + PAGE_SIZE - 1) >>
1699                 PAGE_SHIFT;
1700         if (rdpg.rp_npages > PTLRPC_MAX_BRW_PAGES)
1701                 RETURN(-EINVAL);
1702
1703         CDEBUG(D_INFO, "reading nodemap log, name '%s', size = %u\n",
1704                body->mcb_name, rdpg.rp_count);
1705
1706         /* allocate pages to store the containers */
1707         OBD_ALLOC_PTR_ARRAY(rdpg.rp_pages, rdpg.rp_npages);
1708         if (rdpg.rp_pages == NULL)
1709                 RETURN(-ENOMEM);
1710         for (i = 0; i < rdpg.rp_npages; i++) {
1711                 rdpg.rp_pages[i] = alloc_page(GFP_NOFS);
1712                 if (rdpg.rp_pages[i] == NULL)
1713                         GOTO(out, rc = -ENOMEM);
1714         }
1715
1716         rdpg.rp_hash = body->mcb_offset;
1717         nodemap_ii.ii_magic = IDX_INFO_MAGIC;
1718         nodemap_ii.ii_flags = II_FL_NOHASH;
1719         nodemap_ii.ii_version = rqexp_ted->ted_nodemap_version;
1720         nodemap_ii.ii_attrs = body->mcb_nm_cur_pass;
1721         nodemap_ii.ii_count = 0;
1722
1723         bytes = nodemap_index_read(req->rq_svc_thread->t_env,
1724                                    obd2obt(mgs_obd)->obt_nodemap_config_file,
1725                                    &nodemap_ii, &rdpg);
1726         if (bytes < 0)
1727                 GOTO(out, rc = bytes);
1728
1729         rqexp_ted->ted_nodemap_version = nodemap_ii.ii_version;
1730
1731         res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
1732         if (res == NULL)
1733                 GOTO(out, rc = -EINVAL);
1734         res->mcr_offset = nodemap_ii.ii_hash_end;
1735         res->mcr_nm_cur_pass = nodemap_ii.ii_attrs;
1736
1737         page_count = (bytes + PAGE_SIZE - 1) >> PAGE_SHIFT;
1738         LASSERT(page_count <= rdpg.rp_count);
1739         desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
1740                                     PTLRPC_BULK_PUT_SOURCE,
1741                                     MGS_BULK_PORTAL, frag_ops);
1742         if (desc == NULL)
1743                 GOTO(out, rc = -ENOMEM);
1744
1745         for (i = 0; i < page_count && bytes > 0; i++) {
1746                 frag_ops->add_kiov_frag(desc, rdpg.rp_pages[i], 0,
1747                                         min_t(int, bytes, PAGE_SIZE));
1748                 bytes -= PAGE_SIZE;
1749         }
1750
1751         rc = target_bulk_io(req->rq_export, desc);
1752         ptlrpc_free_bulk(desc);
1753
1754 out:
1755         if (rdpg.rp_pages != NULL) {
1756                 for (i = 0; i < rdpg.rp_npages; i++)
1757                         if (rdpg.rp_pages[i] != NULL)
1758                                 __free_page(rdpg.rp_pages[i]);
1759                 OBD_FREE_PTR_ARRAY(rdpg.rp_pages, rdpg.rp_npages);
1760         }
1761         return rc;
1762 }
1763 EXPORT_SYMBOL(nodemap_get_config_req);