Whamcloud - gitweb
LU-6142 tests: Fix style issues for chownmany.c
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53
54 static struct kmem_cache *obd_device_cachep;
55
56 static struct workqueue_struct *zombie_wq;
57
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks, int debug_level);
62
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         struct list_head *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         list_for_each(tmp, &obd_types) {
106                 type = list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165 #ifdef HAVE_SERVER_SUPPORT
166         if (type->typ_sym_filter)
167                 type->typ_debugfs_entry = NULL;
168 #endif
169         debugfs_remove_recursive(type->typ_debugfs_entry);
170         type->typ_debugfs_entry = NULL;
171
172         if (type->typ_lu)
173                 lu_device_type_fini(type->typ_lu);
174
175         spin_lock(&obd_types_lock);
176         list_del(&type->typ_chain);
177         spin_unlock(&obd_types_lock);
178
179 #ifdef CONFIG_PROC_FS
180         if (type->typ_name && type->typ_procroot)
181                 remove_proc_subtree(type->typ_name, proc_lustre_root);
182 #endif
183         if (type->typ_md_ops)
184                 OBD_FREE_PTR(type->typ_md_ops);
185         if (type->typ_dt_ops)
186                 OBD_FREE_PTR(type->typ_dt_ops);
187
188         OBD_FREE(type, sizeof(*type));
189 }
190
191 static struct kobj_type class_ktype = {
192         .sysfs_ops      = &lustre_sysfs_ops,
193         .release        = class_sysfs_release,
194 };
195
196 #ifdef HAVE_SERVER_SUPPORT
197 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
198 {
199         struct dentry *symlink;
200         struct obd_type *type;
201         struct kobject *kobj;
202         int rc;
203
204         kobj = kset_find_obj(lustre_kset, name);
205         if (kobj) {
206                 kobject_put(kobj);
207                 return ERR_PTR(-EEXIST);
208         }
209
210         OBD_ALLOC(type, sizeof(*type));
211         if (!type)
212                 return ERR_PTR(-ENOMEM);
213
214         INIT_LIST_HEAD(&type->typ_chain);
215
216         type->typ_kobj.kset = lustre_kset;
217         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
218                                   &lustre_kset->kobj, "%s", name);
219         if (rc)
220                 return ERR_PTR(rc);
221
222         symlink = debugfs_create_dir(name, debugfs_lustre_root);
223         if (IS_ERR_OR_NULL(symlink)) {
224                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
225                 kobject_put(&type->typ_kobj);
226                 return ERR_PTR(rc);
227         }
228         type->typ_debugfs_entry = symlink;
229         type->typ_sym_filter = true;
230
231         if (enable_proc) {
232                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
233                                                       NULL, NULL);
234                 if (IS_ERR(type->typ_procroot)) {
235                         CERROR("%s: can't create compat proc entry: %d\n",
236                                name, (int)PTR_ERR(type->typ_procroot));
237                         type->typ_procroot = NULL;
238                 }
239         }
240
241         return type;
242 }
243 EXPORT_SYMBOL(class_add_symlinks);
244 #endif /* HAVE_SERVER_SUPPORT */
245
246 #define CLASS_MAX_NAME 1024
247
248 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
249                         bool enable_proc, struct lprocfs_vars *vars,
250                         const char *name, struct lu_device_type *ldt)
251 {
252         struct obd_type *type;
253         int rc;
254
255         ENTRY;
256         /* sanity check */
257         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
258
259         if (class_search_type(name)) {
260 #ifdef HAVE_SERVER_SUPPORT
261                 if (strcmp(name, LUSTRE_LOV_NAME) == 0 ||
262                     strcmp(name, LUSTRE_OSC_NAME) == 0) {
263                         struct kobject *kobj;
264
265                         kobj = kset_find_obj(lustre_kset, name);
266                         if (kobj) {
267                                 type = container_of(kobj, struct obd_type,
268                                                     typ_kobj);
269                                 goto dir_exist;
270                         }
271                 }
272 #endif /* HAVE_SERVER_SUPPORT */
273                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
274                 RETURN(-EEXIST);
275         }
276
277         OBD_ALLOC(type, sizeof(*type));
278         if (type == NULL)
279                 RETURN(-ENOMEM);
280
281         INIT_LIST_HEAD(&type->typ_chain);
282         type->typ_kobj.kset = lustre_kset;
283         kobject_init(&type->typ_kobj, &class_ktype);
284 #ifdef HAVE_SERVER_SUPPORT
285 dir_exist:
286 #endif /* HAVE_SERVER_SUPPORT */
287         OBD_ALLOC_PTR(type->typ_dt_ops);
288         OBD_ALLOC_PTR(type->typ_md_ops);
289
290         if (type->typ_dt_ops == NULL ||
291             type->typ_md_ops == NULL)
292                 GOTO (failed, rc = -ENOMEM);
293
294         *(type->typ_dt_ops) = *dt_ops;
295         /* md_ops is optional */
296         if (md_ops)
297                 *(type->typ_md_ops) = *md_ops;
298         spin_lock_init(&type->obd_type_lock);
299
300 #ifdef HAVE_SERVER_SUPPORT
301         if (type->typ_sym_filter)
302                 goto setup_ldt;
303 #endif
304 #ifdef CONFIG_PROC_FS
305         if (enable_proc && !type->typ_procroot) {
306                 type->typ_procroot = lprocfs_register(name,
307                                                       proc_lustre_root,
308                                                       NULL, type);
309                 if (IS_ERR(type->typ_procroot)) {
310                         rc = PTR_ERR(type->typ_procroot);
311                         type->typ_procroot = NULL;
312                         GOTO(failed, rc);
313                 }
314         }
315 #endif
316         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
317                                                     vars, type);
318         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
319                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
320                                              : -ENOMEM;
321                 type->typ_debugfs_entry = NULL;
322                 GOTO(failed, rc);
323         }
324
325         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
326         if (rc)
327                 GOTO(failed, rc);
328 #ifdef HAVE_SERVER_SUPPORT
329 setup_ldt:
330 #endif
331         if (ldt) {
332                 type->typ_lu = ldt;
333                 rc = lu_device_type_init(ldt);
334                 if (rc)
335                         GOTO(failed, rc);
336         }
337
338         spin_lock(&obd_types_lock);
339         list_add(&type->typ_chain, &obd_types);
340         spin_unlock(&obd_types_lock);
341
342         RETURN(0);
343
344 failed:
345         kobject_put(&type->typ_kobj);
346
347         RETURN(rc);
348 }
349 EXPORT_SYMBOL(class_register_type);
350
351 int class_unregister_type(const char *name)
352 {
353         struct obd_type *type = class_search_type(name);
354         ENTRY;
355
356         if (!type) {
357                 CERROR("unknown obd type\n");
358                 RETURN(-EINVAL);
359         }
360
361         if (type->typ_refcnt) {
362                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
363                 /* This is a bad situation, let's make the best of it */
364                 /* Remove ops, but leave the name for debugging */
365                 OBD_FREE_PTR(type->typ_dt_ops);
366                 OBD_FREE_PTR(type->typ_md_ops);
367                 RETURN(-EBUSY);
368         }
369
370         kobject_put(&type->typ_kobj);
371
372         RETURN(0);
373 } /* class_unregister_type */
374 EXPORT_SYMBOL(class_unregister_type);
375
376 /**
377  * Create a new obd device.
378  *
379  * Allocate the new obd_device and initialize it.
380  *
381  * \param[in] type_name obd device type string.
382  * \param[in] name      obd device name.
383  * \param[in] uuid      obd device UUID
384  *
385  * \retval newdev         pointer to created obd_device
386  * \retval ERR_PTR(errno) on error
387  */
388 struct obd_device *class_newdev(const char *type_name, const char *name,
389                                 const char *uuid)
390 {
391         struct obd_device *newdev;
392         struct obd_type *type = NULL;
393         ENTRY;
394
395         if (strlen(name) >= MAX_OBD_NAME) {
396                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
397                 RETURN(ERR_PTR(-EINVAL));
398         }
399
400         type = class_get_type(type_name);
401         if (type == NULL){
402                 CERROR("OBD: unknown type: %s\n", type_name);
403                 RETURN(ERR_PTR(-ENODEV));
404         }
405
406         newdev = obd_device_alloc();
407         if (newdev == NULL) {
408                 class_put_type(type);
409                 RETURN(ERR_PTR(-ENOMEM));
410         }
411         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
412         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
413         newdev->obd_type = type;
414         newdev->obd_minor = -1;
415
416         rwlock_init(&newdev->obd_pool_lock);
417         newdev->obd_pool_limit = 0;
418         newdev->obd_pool_slv = 0;
419
420         INIT_LIST_HEAD(&newdev->obd_exports);
421         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
422         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
423         INIT_LIST_HEAD(&newdev->obd_exports_timed);
424         INIT_LIST_HEAD(&newdev->obd_nid_stats);
425         spin_lock_init(&newdev->obd_nid_lock);
426         spin_lock_init(&newdev->obd_dev_lock);
427         mutex_init(&newdev->obd_dev_mutex);
428         spin_lock_init(&newdev->obd_osfs_lock);
429         /* newdev->obd_osfs_age must be set to a value in the distant
430          * past to guarantee a fresh statfs is fetched on mount. */
431         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
432
433         /* XXX belongs in setup not attach  */
434         init_rwsem(&newdev->obd_observer_link_sem);
435         /* recovery data */
436         spin_lock_init(&newdev->obd_recovery_task_lock);
437         init_waitqueue_head(&newdev->obd_next_transno_waitq);
438         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
439         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
440         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
441         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
442         INIT_LIST_HEAD(&newdev->obd_evict_list);
443         INIT_LIST_HEAD(&newdev->obd_lwp_list);
444
445         llog_group_init(&newdev->obd_olg);
446         /* Detach drops this */
447         atomic_set(&newdev->obd_refcount, 1);
448         lu_ref_init(&newdev->obd_reference);
449         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
450
451         newdev->obd_conn_inprogress = 0;
452
453         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
454
455         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
456                newdev->obd_name, newdev);
457
458         return newdev;
459 }
460
461 /**
462  * Free obd device.
463  *
464  * \param[in] obd obd_device to be freed
465  *
466  * \retval none
467  */
468 void class_free_dev(struct obd_device *obd)
469 {
470         struct obd_type *obd_type = obd->obd_type;
471
472         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
473                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
474         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
475                  "obd %p != obd_devs[%d] %p\n",
476                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
477         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
478                  "obd_refcount should be 0, not %d\n",
479                  atomic_read(&obd->obd_refcount));
480         LASSERT(obd_type != NULL);
481
482         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
483                obd->obd_name, obd->obd_type->typ_name);
484
485         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
486                          obd->obd_name, obd->obd_uuid.uuid);
487         if (obd->obd_stopping) {
488                 int err;
489
490                 /* If we're not stopping, we were never set up */
491                 err = obd_cleanup(obd);
492                 if (err)
493                         CERROR("Cleanup %s returned %d\n",
494                                 obd->obd_name, err);
495         }
496
497         obd_device_free(obd);
498
499         class_put_type(obd_type);
500 }
501
502 /**
503  * Unregister obd device.
504  *
505  * Free slot in obd_dev[] used by \a obd.
506  *
507  * \param[in] new_obd obd_device to be unregistered
508  *
509  * \retval none
510  */
511 void class_unregister_device(struct obd_device *obd)
512 {
513         write_lock(&obd_dev_lock);
514         if (obd->obd_minor >= 0) {
515                 LASSERT(obd_devs[obd->obd_minor] == obd);
516                 obd_devs[obd->obd_minor] = NULL;
517                 obd->obd_minor = -1;
518         }
519         write_unlock(&obd_dev_lock);
520 }
521
522 /**
523  * Register obd device.
524  *
525  * Find free slot in obd_devs[], fills it with \a new_obd.
526  *
527  * \param[in] new_obd obd_device to be registered
528  *
529  * \retval 0          success
530  * \retval -EEXIST    device with this name is registered
531  * \retval -EOVERFLOW obd_devs[] is full
532  */
533 int class_register_device(struct obd_device *new_obd)
534 {
535         int ret = 0;
536         int i;
537         int new_obd_minor = 0;
538         bool minor_assign = false;
539         bool retried = false;
540
541 again:
542         write_lock(&obd_dev_lock);
543         for (i = 0; i < class_devno_max(); i++) {
544                 struct obd_device *obd = class_num2obd(i);
545
546                 if (obd != NULL &&
547                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
548
549                         if (!retried) {
550                                 write_unlock(&obd_dev_lock);
551
552                                 /* the obd_device could be waited to be
553                                  * destroyed by the "obd_zombie_impexp_thread".
554                                  */
555                                 obd_zombie_barrier();
556                                 retried = true;
557                                 goto again;
558                         }
559
560                         CERROR("%s: already exists, won't add\n",
561                                obd->obd_name);
562                         /* in case we found a free slot before duplicate */
563                         minor_assign = false;
564                         ret = -EEXIST;
565                         break;
566                 }
567                 if (!minor_assign && obd == NULL) {
568                         new_obd_minor = i;
569                         minor_assign = true;
570                 }
571         }
572
573         if (minor_assign) {
574                 new_obd->obd_minor = new_obd_minor;
575                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
576                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
577                 obd_devs[new_obd_minor] = new_obd;
578         } else {
579                 if (ret == 0) {
580                         ret = -EOVERFLOW;
581                         CERROR("%s: all %u/%u devices used, increase "
582                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
583                                i, class_devno_max(), ret);
584                 }
585         }
586         write_unlock(&obd_dev_lock);
587
588         RETURN(ret);
589 }
590
591 static int class_name2dev_nolock(const char *name)
592 {
593         int i;
594
595         if (!name)
596                 return -1;
597
598         for (i = 0; i < class_devno_max(); i++) {
599                 struct obd_device *obd = class_num2obd(i);
600
601                 if (obd && strcmp(name, obd->obd_name) == 0) {
602                         /* Make sure we finished attaching before we give
603                            out any references */
604                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
605                         if (obd->obd_attached) {
606                                 return i;
607                         }
608                         break;
609                 }
610         }
611
612         return -1;
613 }
614
615 int class_name2dev(const char *name)
616 {
617         int i;
618
619         if (!name)
620                 return -1;
621
622         read_lock(&obd_dev_lock);
623         i = class_name2dev_nolock(name);
624         read_unlock(&obd_dev_lock);
625
626         return i;
627 }
628 EXPORT_SYMBOL(class_name2dev);
629
630 struct obd_device *class_name2obd(const char *name)
631 {
632         int dev = class_name2dev(name);
633
634         if (dev < 0 || dev > class_devno_max())
635                 return NULL;
636         return class_num2obd(dev);
637 }
638 EXPORT_SYMBOL(class_name2obd);
639
640 int class_uuid2dev_nolock(struct obd_uuid *uuid)
641 {
642         int i;
643
644         for (i = 0; i < class_devno_max(); i++) {
645                 struct obd_device *obd = class_num2obd(i);
646
647                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
648                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
649                         return i;
650                 }
651         }
652
653         return -1;
654 }
655
656 int class_uuid2dev(struct obd_uuid *uuid)
657 {
658         int i;
659
660         read_lock(&obd_dev_lock);
661         i = class_uuid2dev_nolock(uuid);
662         read_unlock(&obd_dev_lock);
663
664         return i;
665 }
666 EXPORT_SYMBOL(class_uuid2dev);
667
668 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
669 {
670         int dev = class_uuid2dev(uuid);
671         if (dev < 0)
672                 return NULL;
673         return class_num2obd(dev);
674 }
675 EXPORT_SYMBOL(class_uuid2obd);
676
677 /**
678  * Get obd device from ::obd_devs[]
679  *
680  * \param num [in] array index
681  *
682  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
683  *         otherwise return the obd device there.
684  */
685 struct obd_device *class_num2obd(int num)
686 {
687         struct obd_device *obd = NULL;
688
689         if (num < class_devno_max()) {
690                 obd = obd_devs[num];
691                 if (obd == NULL)
692                         return NULL;
693
694                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
695                          "%p obd_magic %08x != %08x\n",
696                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
697                 LASSERTF(obd->obd_minor == num,
698                          "%p obd_minor %0d != %0d\n",
699                          obd, obd->obd_minor, num);
700         }
701
702         return obd;
703 }
704
705 /**
706  * Find obd in obd_dev[] by name or uuid.
707  *
708  * Increment obd's refcount if found.
709  *
710  * \param[in] str obd name or uuid
711  *
712  * \retval NULL    if not found
713  * \retval target  pointer to found obd_device
714  */
715 struct obd_device *class_dev_by_str(const char *str)
716 {
717         struct obd_device *target = NULL;
718         struct obd_uuid tgtuuid;
719         int rc;
720
721         obd_str2uuid(&tgtuuid, str);
722
723         read_lock(&obd_dev_lock);
724         rc = class_uuid2dev_nolock(&tgtuuid);
725         if (rc < 0)
726                 rc = class_name2dev_nolock(str);
727
728         if (rc >= 0)
729                 target = class_num2obd(rc);
730
731         if (target != NULL)
732                 class_incref(target, "find", current);
733         read_unlock(&obd_dev_lock);
734
735         RETURN(target);
736 }
737 EXPORT_SYMBOL(class_dev_by_str);
738
739 /**
740  * Get obd devices count. Device in any
741  *    state are counted
742  * \retval obd device count
743  */
744 int get_devices_count(void)
745 {
746         int index, max_index = class_devno_max(), dev_count = 0;
747
748         read_lock(&obd_dev_lock);
749         for (index = 0; index <= max_index; index++) {
750                 struct obd_device *obd = class_num2obd(index);
751                 if (obd != NULL)
752                         dev_count++;
753         }
754         read_unlock(&obd_dev_lock);
755
756         return dev_count;
757 }
758 EXPORT_SYMBOL(get_devices_count);
759
760 void class_obd_list(void)
761 {
762         char *status;
763         int i;
764
765         read_lock(&obd_dev_lock);
766         for (i = 0; i < class_devno_max(); i++) {
767                 struct obd_device *obd = class_num2obd(i);
768
769                 if (obd == NULL)
770                         continue;
771                 if (obd->obd_stopping)
772                         status = "ST";
773                 else if (obd->obd_set_up)
774                         status = "UP";
775                 else if (obd->obd_attached)
776                         status = "AT";
777                 else
778                         status = "--";
779                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
780                          i, status, obd->obd_type->typ_name,
781                          obd->obd_name, obd->obd_uuid.uuid,
782                          atomic_read(&obd->obd_refcount));
783         }
784         read_unlock(&obd_dev_lock);
785         return;
786 }
787
788 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
789    specified, then only the client with that uuid is returned,
790    otherwise any client connected to the tgt is returned. */
791 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
792                                           const char *type_name,
793                                           struct obd_uuid *grp_uuid)
794 {
795         int i;
796
797         read_lock(&obd_dev_lock);
798         for (i = 0; i < class_devno_max(); i++) {
799                 struct obd_device *obd = class_num2obd(i);
800
801                 if (obd == NULL)
802                         continue;
803                 if ((strncmp(obd->obd_type->typ_name, type_name,
804                              strlen(type_name)) == 0)) {
805                         if (obd_uuid_equals(tgt_uuid,
806                                             &obd->u.cli.cl_target_uuid) &&
807                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
808                                                          &obd->obd_uuid) : 1)) {
809                                 read_unlock(&obd_dev_lock);
810                                 return obd;
811                         }
812                 }
813         }
814         read_unlock(&obd_dev_lock);
815
816         return NULL;
817 }
818 EXPORT_SYMBOL(class_find_client_obd);
819
820 /* Iterate the obd_device list looking devices have grp_uuid. Start
821    searching at *next, and if a device is found, the next index to look
822    at is saved in *next. If next is NULL, then the first matching device
823    will always be returned. */
824 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
825 {
826         int i;
827
828         if (next == NULL)
829                 i = 0;
830         else if (*next >= 0 && *next < class_devno_max())
831                 i = *next;
832         else
833                 return NULL;
834
835         read_lock(&obd_dev_lock);
836         for (; i < class_devno_max(); i++) {
837                 struct obd_device *obd = class_num2obd(i);
838
839                 if (obd == NULL)
840                         continue;
841                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
842                         if (next != NULL)
843                                 *next = i+1;
844                         read_unlock(&obd_dev_lock);
845                         return obd;
846                 }
847         }
848         read_unlock(&obd_dev_lock);
849
850         return NULL;
851 }
852 EXPORT_SYMBOL(class_devices_in_group);
853
854 /**
855  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
856  * adjust sptlrpc settings accordingly.
857  */
858 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
859 {
860         struct obd_device  *obd;
861         const char         *type;
862         int                 i, rc = 0, rc2;
863
864         LASSERT(namelen > 0);
865
866         read_lock(&obd_dev_lock);
867         for (i = 0; i < class_devno_max(); i++) {
868                 obd = class_num2obd(i);
869
870                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
871                         continue;
872
873                 /* only notify mdc, osc, osp, lwp, mdt, ost
874                  * because only these have a -sptlrpc llog */
875                 type = obd->obd_type->typ_name;
876                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
877                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
878                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
879                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
880                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
881                     strcmp(type, LUSTRE_OST_NAME) != 0)
882                         continue;
883
884                 if (strncmp(obd->obd_name, fsname, namelen))
885                         continue;
886
887                 class_incref(obd, __FUNCTION__, obd);
888                 read_unlock(&obd_dev_lock);
889                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
890                                          sizeof(KEY_SPTLRPC_CONF),
891                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
892                 rc = rc ? rc : rc2;
893                 class_decref(obd, __FUNCTION__, obd);
894                 read_lock(&obd_dev_lock);
895         }
896         read_unlock(&obd_dev_lock);
897         return rc;
898 }
899 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
900
901 void obd_cleanup_caches(void)
902 {
903         ENTRY;
904         if (obd_device_cachep) {
905                 kmem_cache_destroy(obd_device_cachep);
906                 obd_device_cachep = NULL;
907         }
908
909         EXIT;
910 }
911
912 int obd_init_caches(void)
913 {
914         int rc;
915         ENTRY;
916
917         LASSERT(obd_device_cachep == NULL);
918         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
919                                 sizeof(struct obd_device),
920                                 0, 0, 0, sizeof(struct obd_device), NULL);
921         if (!obd_device_cachep)
922                 GOTO(out, rc = -ENOMEM);
923
924         RETURN(0);
925 out:
926         obd_cleanup_caches();
927         RETURN(rc);
928 }
929
930 static struct portals_handle_ops export_handle_ops;
931
932 /* map connection to client */
933 struct obd_export *class_conn2export(struct lustre_handle *conn)
934 {
935         struct obd_export *export;
936         ENTRY;
937
938         if (!conn) {
939                 CDEBUG(D_CACHE, "looking for null handle\n");
940                 RETURN(NULL);
941         }
942
943         if (conn->cookie == -1) {  /* this means assign a new connection */
944                 CDEBUG(D_CACHE, "want a new connection\n");
945                 RETURN(NULL);
946         }
947
948         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
949         export = class_handle2object(conn->cookie, &export_handle_ops);
950         RETURN(export);
951 }
952 EXPORT_SYMBOL(class_conn2export);
953
954 struct obd_device *class_exp2obd(struct obd_export *exp)
955 {
956         if (exp)
957                 return exp->exp_obd;
958         return NULL;
959 }
960 EXPORT_SYMBOL(class_exp2obd);
961
962 struct obd_import *class_exp2cliimp(struct obd_export *exp)
963 {
964         struct obd_device *obd = exp->exp_obd;
965         if (obd == NULL)
966                 return NULL;
967         return obd->u.cli.cl_import;
968 }
969 EXPORT_SYMBOL(class_exp2cliimp);
970
971 /* Export management functions */
972 static void class_export_destroy(struct obd_export *exp)
973 {
974         struct obd_device *obd = exp->exp_obd;
975         ENTRY;
976
977         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
978         LASSERT(obd != NULL);
979
980         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
981                exp->exp_client_uuid.uuid, obd->obd_name);
982
983         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
984         if (exp->exp_connection)
985                 ptlrpc_put_connection_superhack(exp->exp_connection);
986
987         LASSERT(list_empty(&exp->exp_outstanding_replies));
988         LASSERT(list_empty(&exp->exp_uncommitted_replies));
989         LASSERT(list_empty(&exp->exp_req_replay_queue));
990         LASSERT(list_empty(&exp->exp_hp_rpcs));
991         obd_destroy_export(exp);
992         /* self export doesn't hold a reference to an obd, although it
993          * exists until freeing of the obd */
994         if (exp != obd->obd_self_export)
995                 class_decref(obd, "export", exp);
996
997         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
998         EXIT;
999 }
1000
1001 static void export_handle_addref(void *export)
1002 {
1003         class_export_get(export);
1004 }
1005
1006 static struct portals_handle_ops export_handle_ops = {
1007         .hop_addref = export_handle_addref,
1008         .hop_free   = NULL,
1009 };
1010
1011 struct obd_export *class_export_get(struct obd_export *exp)
1012 {
1013         atomic_inc(&exp->exp_refcount);
1014         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1015                atomic_read(&exp->exp_refcount));
1016         return exp;
1017 }
1018 EXPORT_SYMBOL(class_export_get);
1019
1020 void class_export_put(struct obd_export *exp)
1021 {
1022         LASSERT(exp != NULL);
1023         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1024         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1025                atomic_read(&exp->exp_refcount) - 1);
1026
1027         if (atomic_dec_and_test(&exp->exp_refcount)) {
1028                 struct obd_device *obd = exp->exp_obd;
1029
1030                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1031                        exp, exp->exp_client_uuid.uuid);
1032
1033                 /* release nid stat refererence */
1034                 lprocfs_exp_cleanup(exp);
1035
1036                 if (exp == obd->obd_self_export) {
1037                         /* self export should be destroyed without
1038                          * zombie thread as it doesn't hold a
1039                          * reference to obd and doesn't hold any
1040                          * resources */
1041                         class_export_destroy(exp);
1042                         /* self export is destroyed, no class
1043                          * references exist and it is safe to free
1044                          * obd */
1045                         class_free_dev(obd);
1046                 } else {
1047                         LASSERT(!list_empty(&exp->exp_obd_chain));
1048                         obd_zombie_export_add(exp);
1049                 }
1050
1051         }
1052 }
1053 EXPORT_SYMBOL(class_export_put);
1054
1055 static void obd_zombie_exp_cull(struct work_struct *ws)
1056 {
1057         struct obd_export *export;
1058
1059         export = container_of(ws, struct obd_export, exp_zombie_work);
1060         class_export_destroy(export);
1061 }
1062
1063 /* Creates a new export, adds it to the hash table, and returns a
1064  * pointer to it. The refcount is 2: one for the hash reference, and
1065  * one for the pointer returned by this function. */
1066 struct obd_export *__class_new_export(struct obd_device *obd,
1067                                       struct obd_uuid *cluuid, bool is_self)
1068 {
1069         struct obd_export *export;
1070         struct cfs_hash *hash = NULL;
1071         int rc = 0;
1072         ENTRY;
1073
1074         OBD_ALLOC_PTR(export);
1075         if (!export)
1076                 return ERR_PTR(-ENOMEM);
1077
1078         export->exp_conn_cnt = 0;
1079         export->exp_lock_hash = NULL;
1080         export->exp_flock_hash = NULL;
1081         /* 2 = class_handle_hash + last */
1082         atomic_set(&export->exp_refcount, 2);
1083         atomic_set(&export->exp_rpc_count, 0);
1084         atomic_set(&export->exp_cb_count, 0);
1085         atomic_set(&export->exp_locks_count, 0);
1086 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1087         INIT_LIST_HEAD(&export->exp_locks_list);
1088         spin_lock_init(&export->exp_locks_list_guard);
1089 #endif
1090         atomic_set(&export->exp_replay_count, 0);
1091         export->exp_obd = obd;
1092         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1093         spin_lock_init(&export->exp_uncommitted_replies_lock);
1094         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1095         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1096         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1097         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1098         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1099         class_handle_hash(&export->exp_handle, &export_handle_ops);
1100         export->exp_last_request_time = ktime_get_real_seconds();
1101         spin_lock_init(&export->exp_lock);
1102         spin_lock_init(&export->exp_rpc_lock);
1103         INIT_HLIST_NODE(&export->exp_uuid_hash);
1104         INIT_HLIST_NODE(&export->exp_nid_hash);
1105         INIT_HLIST_NODE(&export->exp_gen_hash);
1106         spin_lock_init(&export->exp_bl_list_lock);
1107         INIT_LIST_HEAD(&export->exp_bl_list);
1108         INIT_LIST_HEAD(&export->exp_stale_list);
1109         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1110
1111         export->exp_sp_peer = LUSTRE_SP_ANY;
1112         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1113         export->exp_client_uuid = *cluuid;
1114         obd_init_export(export);
1115
1116         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1117                 spin_lock(&obd->obd_dev_lock);
1118                 /* shouldn't happen, but might race */
1119                 if (obd->obd_stopping)
1120                         GOTO(exit_unlock, rc = -ENODEV);
1121
1122                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1123                 if (hash == NULL)
1124                         GOTO(exit_unlock, rc = -ENODEV);
1125                 spin_unlock(&obd->obd_dev_lock);
1126
1127                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1128                 if (rc != 0) {
1129                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1130                                       obd->obd_name, cluuid->uuid, rc);
1131                         GOTO(exit_err, rc = -EALREADY);
1132                 }
1133         }
1134
1135         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1136         spin_lock(&obd->obd_dev_lock);
1137         if (obd->obd_stopping) {
1138                 if (hash)
1139                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1140                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1141         }
1142
1143         if (!is_self) {
1144                 class_incref(obd, "export", export);
1145                 list_add_tail(&export->exp_obd_chain_timed,
1146                               &obd->obd_exports_timed);
1147                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1148                 obd->obd_num_exports++;
1149         } else {
1150                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1151                 INIT_LIST_HEAD(&export->exp_obd_chain);
1152         }
1153         spin_unlock(&obd->obd_dev_lock);
1154         if (hash)
1155                 cfs_hash_putref(hash);
1156         RETURN(export);
1157
1158 exit_unlock:
1159         spin_unlock(&obd->obd_dev_lock);
1160 exit_err:
1161         if (hash)
1162                 cfs_hash_putref(hash);
1163         class_handle_unhash(&export->exp_handle);
1164         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1165         obd_destroy_export(export);
1166         OBD_FREE_PTR(export);
1167         return ERR_PTR(rc);
1168 }
1169
1170 struct obd_export *class_new_export(struct obd_device *obd,
1171                                     struct obd_uuid *uuid)
1172 {
1173         return __class_new_export(obd, uuid, false);
1174 }
1175 EXPORT_SYMBOL(class_new_export);
1176
1177 struct obd_export *class_new_export_self(struct obd_device *obd,
1178                                          struct obd_uuid *uuid)
1179 {
1180         return __class_new_export(obd, uuid, true);
1181 }
1182
1183 void class_unlink_export(struct obd_export *exp)
1184 {
1185         class_handle_unhash(&exp->exp_handle);
1186
1187         if (exp->exp_obd->obd_self_export == exp) {
1188                 class_export_put(exp);
1189                 return;
1190         }
1191
1192         spin_lock(&exp->exp_obd->obd_dev_lock);
1193         /* delete an uuid-export hashitem from hashtables */
1194         if (!hlist_unhashed(&exp->exp_uuid_hash))
1195                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1196                              &exp->exp_client_uuid,
1197                              &exp->exp_uuid_hash);
1198
1199 #ifdef HAVE_SERVER_SUPPORT
1200         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1201                 struct tg_export_data   *ted = &exp->exp_target_data;
1202                 struct cfs_hash         *hash;
1203
1204                 /* Because obd_gen_hash will not be released until
1205                  * class_cleanup(), so hash should never be NULL here */
1206                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1207                 LASSERT(hash != NULL);
1208                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1209                              &exp->exp_gen_hash);
1210                 cfs_hash_putref(hash);
1211         }
1212 #endif /* HAVE_SERVER_SUPPORT */
1213
1214         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1215         list_del_init(&exp->exp_obd_chain_timed);
1216         exp->exp_obd->obd_num_exports--;
1217         spin_unlock(&exp->exp_obd->obd_dev_lock);
1218         atomic_inc(&obd_stale_export_num);
1219
1220         /* A reference is kept by obd_stale_exports list */
1221         obd_stale_export_put(exp);
1222 }
1223 EXPORT_SYMBOL(class_unlink_export);
1224
1225 /* Import management functions */
1226 static void obd_zombie_import_free(struct obd_import *imp)
1227 {
1228         ENTRY;
1229
1230         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1231                 imp->imp_obd->obd_name);
1232
1233         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1234
1235         ptlrpc_put_connection_superhack(imp->imp_connection);
1236
1237         while (!list_empty(&imp->imp_conn_list)) {
1238                 struct obd_import_conn *imp_conn;
1239
1240                 imp_conn = list_entry(imp->imp_conn_list.next,
1241                                       struct obd_import_conn, oic_item);
1242                 list_del_init(&imp_conn->oic_item);
1243                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1244                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1245         }
1246
1247         LASSERT(imp->imp_sec == NULL);
1248         class_decref(imp->imp_obd, "import", imp);
1249         OBD_FREE_PTR(imp);
1250         EXIT;
1251 }
1252
1253 struct obd_import *class_import_get(struct obd_import *import)
1254 {
1255         atomic_inc(&import->imp_refcount);
1256         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1257                atomic_read(&import->imp_refcount),
1258                import->imp_obd->obd_name);
1259         return import;
1260 }
1261 EXPORT_SYMBOL(class_import_get);
1262
1263 void class_import_put(struct obd_import *imp)
1264 {
1265         ENTRY;
1266
1267         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1268
1269         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1270                atomic_read(&imp->imp_refcount) - 1,
1271                imp->imp_obd->obd_name);
1272
1273         if (atomic_dec_and_test(&imp->imp_refcount)) {
1274                 CDEBUG(D_INFO, "final put import %p\n", imp);
1275                 obd_zombie_import_add(imp);
1276         }
1277
1278         /* catch possible import put race */
1279         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1280         EXIT;
1281 }
1282 EXPORT_SYMBOL(class_import_put);
1283
1284 static void init_imp_at(struct imp_at *at) {
1285         int i;
1286         at_init(&at->iat_net_latency, 0, 0);
1287         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1288                 /* max service estimates are tracked on the server side, so
1289                    don't use the AT history here, just use the last reported
1290                    val. (But keep hist for proc histogram, worst_ever) */
1291                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1292                         AT_FLG_NOHIST);
1293         }
1294 }
1295
1296 static void obd_zombie_imp_cull(struct work_struct *ws)
1297 {
1298         struct obd_import *import;
1299
1300         import = container_of(ws, struct obd_import, imp_zombie_work);
1301         obd_zombie_import_free(import);
1302 }
1303
1304 struct obd_import *class_new_import(struct obd_device *obd)
1305 {
1306         struct obd_import *imp;
1307         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1308
1309         OBD_ALLOC(imp, sizeof(*imp));
1310         if (imp == NULL)
1311                 return NULL;
1312
1313         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1314         INIT_LIST_HEAD(&imp->imp_replay_list);
1315         INIT_LIST_HEAD(&imp->imp_sending_list);
1316         INIT_LIST_HEAD(&imp->imp_delayed_list);
1317         INIT_LIST_HEAD(&imp->imp_committed_list);
1318         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1319         imp->imp_known_replied_xid = 0;
1320         imp->imp_replay_cursor = &imp->imp_committed_list;
1321         spin_lock_init(&imp->imp_lock);
1322         imp->imp_last_success_conn = 0;
1323         imp->imp_state = LUSTRE_IMP_NEW;
1324         imp->imp_obd = class_incref(obd, "import", imp);
1325         rwlock_init(&imp->imp_sec_lock);
1326         init_waitqueue_head(&imp->imp_recovery_waitq);
1327         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1328
1329         if (curr_pid_ns->child_reaper)
1330                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1331         else
1332                 imp->imp_sec_refpid = 1;
1333
1334         atomic_set(&imp->imp_refcount, 2);
1335         atomic_set(&imp->imp_unregistering, 0);
1336         atomic_set(&imp->imp_inflight, 0);
1337         atomic_set(&imp->imp_replay_inflight, 0);
1338         atomic_set(&imp->imp_inval_count, 0);
1339         INIT_LIST_HEAD(&imp->imp_conn_list);
1340         init_imp_at(&imp->imp_at);
1341
1342         /* the default magic is V2, will be used in connect RPC, and
1343          * then adjusted according to the flags in request/reply. */
1344         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1345
1346         return imp;
1347 }
1348 EXPORT_SYMBOL(class_new_import);
1349
1350 void class_destroy_import(struct obd_import *import)
1351 {
1352         LASSERT(import != NULL);
1353         LASSERT(import != LP_POISON);
1354
1355         spin_lock(&import->imp_lock);
1356         import->imp_generation++;
1357         spin_unlock(&import->imp_lock);
1358         class_import_put(import);
1359 }
1360 EXPORT_SYMBOL(class_destroy_import);
1361
1362 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1363
1364 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1365 {
1366         spin_lock(&exp->exp_locks_list_guard);
1367
1368         LASSERT(lock->l_exp_refs_nr >= 0);
1369
1370         if (lock->l_exp_refs_target != NULL &&
1371             lock->l_exp_refs_target != exp) {
1372                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1373                               exp, lock, lock->l_exp_refs_target);
1374         }
1375         if ((lock->l_exp_refs_nr ++) == 0) {
1376                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1377                 lock->l_exp_refs_target = exp;
1378         }
1379         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1380                lock, exp, lock->l_exp_refs_nr);
1381         spin_unlock(&exp->exp_locks_list_guard);
1382 }
1383 EXPORT_SYMBOL(__class_export_add_lock_ref);
1384
1385 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1386 {
1387         spin_lock(&exp->exp_locks_list_guard);
1388         LASSERT(lock->l_exp_refs_nr > 0);
1389         if (lock->l_exp_refs_target != exp) {
1390                 LCONSOLE_WARN("lock %p, "
1391                               "mismatching export pointers: %p, %p\n",
1392                               lock, lock->l_exp_refs_target, exp);
1393         }
1394         if (-- lock->l_exp_refs_nr == 0) {
1395                 list_del_init(&lock->l_exp_refs_link);
1396                 lock->l_exp_refs_target = NULL;
1397         }
1398         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1399                lock, exp, lock->l_exp_refs_nr);
1400         spin_unlock(&exp->exp_locks_list_guard);
1401 }
1402 EXPORT_SYMBOL(__class_export_del_lock_ref);
1403 #endif
1404
1405 /* A connection defines an export context in which preallocation can
1406    be managed. This releases the export pointer reference, and returns
1407    the export handle, so the export refcount is 1 when this function
1408    returns. */
1409 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1410                   struct obd_uuid *cluuid)
1411 {
1412         struct obd_export *export;
1413         LASSERT(conn != NULL);
1414         LASSERT(obd != NULL);
1415         LASSERT(cluuid != NULL);
1416         ENTRY;
1417
1418         export = class_new_export(obd, cluuid);
1419         if (IS_ERR(export))
1420                 RETURN(PTR_ERR(export));
1421
1422         conn->cookie = export->exp_handle.h_cookie;
1423         class_export_put(export);
1424
1425         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1426                cluuid->uuid, conn->cookie);
1427         RETURN(0);
1428 }
1429 EXPORT_SYMBOL(class_connect);
1430
1431 /* if export is involved in recovery then clean up related things */
1432 static void class_export_recovery_cleanup(struct obd_export *exp)
1433 {
1434         struct obd_device *obd = exp->exp_obd;
1435
1436         spin_lock(&obd->obd_recovery_task_lock);
1437         if (obd->obd_recovering) {
1438                 if (exp->exp_in_recovery) {
1439                         spin_lock(&exp->exp_lock);
1440                         exp->exp_in_recovery = 0;
1441                         spin_unlock(&exp->exp_lock);
1442                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1443                         atomic_dec(&obd->obd_connected_clients);
1444                 }
1445
1446                 /* if called during recovery then should update
1447                  * obd_stale_clients counter,
1448                  * lightweight exports are not counted */
1449                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1450                         exp->exp_obd->obd_stale_clients++;
1451         }
1452         spin_unlock(&obd->obd_recovery_task_lock);
1453
1454         spin_lock(&exp->exp_lock);
1455         /** Cleanup req replay fields */
1456         if (exp->exp_req_replay_needed) {
1457                 exp->exp_req_replay_needed = 0;
1458
1459                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1460                 atomic_dec(&obd->obd_req_replay_clients);
1461         }
1462
1463         /** Cleanup lock replay data */
1464         if (exp->exp_lock_replay_needed) {
1465                 exp->exp_lock_replay_needed = 0;
1466
1467                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1468                 atomic_dec(&obd->obd_lock_replay_clients);
1469         }
1470         spin_unlock(&exp->exp_lock);
1471 }
1472
1473 /* This function removes 1-3 references from the export:
1474  * 1 - for export pointer passed
1475  * and if disconnect really need
1476  * 2 - removing from hash
1477  * 3 - in client_unlink_export
1478  * The export pointer passed to this function can destroyed */
1479 int class_disconnect(struct obd_export *export)
1480 {
1481         int already_disconnected;
1482         ENTRY;
1483
1484         if (export == NULL) {
1485                 CWARN("attempting to free NULL export %p\n", export);
1486                 RETURN(-EINVAL);
1487         }
1488
1489         spin_lock(&export->exp_lock);
1490         already_disconnected = export->exp_disconnected;
1491         export->exp_disconnected = 1;
1492         /*  We hold references of export for uuid hash
1493          *  and nid_hash and export link at least. So
1494          *  it is safe to call cfs_hash_del in there.  */
1495         if (!hlist_unhashed(&export->exp_nid_hash))
1496                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1497                              &export->exp_connection->c_peer.nid,
1498                              &export->exp_nid_hash);
1499         spin_unlock(&export->exp_lock);
1500
1501         /* class_cleanup(), abort_recovery(), and class_fail_export()
1502          * all end up in here, and if any of them race we shouldn't
1503          * call extra class_export_puts(). */
1504         if (already_disconnected) {
1505                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1506                 GOTO(no_disconn, already_disconnected);
1507         }
1508
1509         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1510                export->exp_handle.h_cookie);
1511
1512         class_export_recovery_cleanup(export);
1513         class_unlink_export(export);
1514 no_disconn:
1515         class_export_put(export);
1516         RETURN(0);
1517 }
1518 EXPORT_SYMBOL(class_disconnect);
1519
1520 /* Return non-zero for a fully connected export */
1521 int class_connected_export(struct obd_export *exp)
1522 {
1523         int connected = 0;
1524
1525         if (exp) {
1526                 spin_lock(&exp->exp_lock);
1527                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1528                 spin_unlock(&exp->exp_lock);
1529         }
1530         return connected;
1531 }
1532 EXPORT_SYMBOL(class_connected_export);
1533
1534 static void class_disconnect_export_list(struct list_head *list,
1535                                          enum obd_option flags)
1536 {
1537         int rc;
1538         struct obd_export *exp;
1539         ENTRY;
1540
1541         /* It's possible that an export may disconnect itself, but
1542          * nothing else will be added to this list. */
1543         while (!list_empty(list)) {
1544                 exp = list_entry(list->next, struct obd_export,
1545                                  exp_obd_chain);
1546                 /* need for safe call CDEBUG after obd_disconnect */
1547                 class_export_get(exp);
1548
1549                 spin_lock(&exp->exp_lock);
1550                 exp->exp_flags = flags;
1551                 spin_unlock(&exp->exp_lock);
1552
1553                 if (obd_uuid_equals(&exp->exp_client_uuid,
1554                                     &exp->exp_obd->obd_uuid)) {
1555                         CDEBUG(D_HA,
1556                                "exp %p export uuid == obd uuid, don't discon\n",
1557                                exp);
1558                         /* Need to delete this now so we don't end up pointing
1559                          * to work_list later when this export is cleaned up. */
1560                         list_del_init(&exp->exp_obd_chain);
1561                         class_export_put(exp);
1562                         continue;
1563                 }
1564
1565                 class_export_get(exp);
1566                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1567                        "last request at %lld\n",
1568                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1569                        exp, exp->exp_last_request_time);
1570                 /* release one export reference anyway */
1571                 rc = obd_disconnect(exp);
1572
1573                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1574                        obd_export_nid2str(exp), exp, rc);
1575                 class_export_put(exp);
1576         }
1577         EXIT;
1578 }
1579
1580 void class_disconnect_exports(struct obd_device *obd)
1581 {
1582         struct list_head work_list;
1583         ENTRY;
1584
1585         /* Move all of the exports from obd_exports to a work list, en masse. */
1586         INIT_LIST_HEAD(&work_list);
1587         spin_lock(&obd->obd_dev_lock);
1588         list_splice_init(&obd->obd_exports, &work_list);
1589         list_splice_init(&obd->obd_delayed_exports, &work_list);
1590         spin_unlock(&obd->obd_dev_lock);
1591
1592         if (!list_empty(&work_list)) {
1593                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1594                        "disconnecting them\n", obd->obd_minor, obd);
1595                 class_disconnect_export_list(&work_list,
1596                                              exp_flags_from_obd(obd));
1597         } else
1598                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1599                        obd->obd_minor, obd);
1600         EXIT;
1601 }
1602 EXPORT_SYMBOL(class_disconnect_exports);
1603
1604 /* Remove exports that have not completed recovery.
1605  */
1606 void class_disconnect_stale_exports(struct obd_device *obd,
1607                                     int (*test_export)(struct obd_export *))
1608 {
1609         struct list_head work_list;
1610         struct obd_export *exp, *n;
1611         int evicted = 0;
1612         ENTRY;
1613
1614         INIT_LIST_HEAD(&work_list);
1615         spin_lock(&obd->obd_dev_lock);
1616         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1617                                  exp_obd_chain) {
1618                 /* don't count self-export as client */
1619                 if (obd_uuid_equals(&exp->exp_client_uuid,
1620                                     &exp->exp_obd->obd_uuid))
1621                         continue;
1622
1623                 /* don't evict clients which have no slot in last_rcvd
1624                  * (e.g. lightweight connection) */
1625                 if (exp->exp_target_data.ted_lr_idx == -1)
1626                         continue;
1627
1628                 spin_lock(&exp->exp_lock);
1629                 if (exp->exp_failed || test_export(exp)) {
1630                         spin_unlock(&exp->exp_lock);
1631                         continue;
1632                 }
1633                 exp->exp_failed = 1;
1634                 spin_unlock(&exp->exp_lock);
1635
1636                 list_move(&exp->exp_obd_chain, &work_list);
1637                 evicted++;
1638                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1639                        obd->obd_name, exp->exp_client_uuid.uuid,
1640                        obd_export_nid2str(exp));
1641                 print_export_data(exp, "EVICTING", 0, D_HA);
1642         }
1643         spin_unlock(&obd->obd_dev_lock);
1644
1645         if (evicted)
1646                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1647                               obd->obd_name, evicted);
1648
1649         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1650                                                  OBD_OPT_ABORT_RECOV);
1651         EXIT;
1652 }
1653 EXPORT_SYMBOL(class_disconnect_stale_exports);
1654
1655 void class_fail_export(struct obd_export *exp)
1656 {
1657         int rc, already_failed;
1658
1659         spin_lock(&exp->exp_lock);
1660         already_failed = exp->exp_failed;
1661         exp->exp_failed = 1;
1662         spin_unlock(&exp->exp_lock);
1663
1664         if (already_failed) {
1665                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1666                        exp, exp->exp_client_uuid.uuid);
1667                 return;
1668         }
1669
1670         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1671                exp, exp->exp_client_uuid.uuid);
1672
1673         if (obd_dump_on_timeout)
1674                 libcfs_debug_dumplog();
1675
1676         /* need for safe call CDEBUG after obd_disconnect */
1677         class_export_get(exp);
1678
1679         /* Most callers into obd_disconnect are removing their own reference
1680          * (request, for example) in addition to the one from the hash table.
1681          * We don't have such a reference here, so make one. */
1682         class_export_get(exp);
1683         rc = obd_disconnect(exp);
1684         if (rc)
1685                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1686         else
1687                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1688                        exp, exp->exp_client_uuid.uuid);
1689         class_export_put(exp);
1690 }
1691 EXPORT_SYMBOL(class_fail_export);
1692
1693 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1694 {
1695         struct cfs_hash *nid_hash;
1696         struct obd_export *doomed_exp = NULL;
1697         int exports_evicted = 0;
1698
1699         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1700
1701         spin_lock(&obd->obd_dev_lock);
1702         /* umount has run already, so evict thread should leave
1703          * its task to umount thread now */
1704         if (obd->obd_stopping) {
1705                 spin_unlock(&obd->obd_dev_lock);
1706                 return exports_evicted;
1707         }
1708         nid_hash = obd->obd_nid_hash;
1709         cfs_hash_getref(nid_hash);
1710         spin_unlock(&obd->obd_dev_lock);
1711
1712         do {
1713                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1714                 if (doomed_exp == NULL)
1715                         break;
1716
1717                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1718                          "nid %s found, wanted nid %s, requested nid %s\n",
1719                          obd_export_nid2str(doomed_exp),
1720                          libcfs_nid2str(nid_key), nid);
1721                 LASSERTF(doomed_exp != obd->obd_self_export,
1722                          "self-export is hashed by NID?\n");
1723                 exports_evicted++;
1724                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1725                               "request\n", obd->obd_name,
1726                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1727                               obd_export_nid2str(doomed_exp));
1728                 class_fail_export(doomed_exp);
1729                 class_export_put(doomed_exp);
1730         } while (1);
1731
1732         cfs_hash_putref(nid_hash);
1733
1734         if (!exports_evicted)
1735                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1736                        obd->obd_name, nid);
1737         return exports_evicted;
1738 }
1739 EXPORT_SYMBOL(obd_export_evict_by_nid);
1740
1741 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1742 {
1743         struct cfs_hash *uuid_hash;
1744         struct obd_export *doomed_exp = NULL;
1745         struct obd_uuid doomed_uuid;
1746         int exports_evicted = 0;
1747
1748         spin_lock(&obd->obd_dev_lock);
1749         if (obd->obd_stopping) {
1750                 spin_unlock(&obd->obd_dev_lock);
1751                 return exports_evicted;
1752         }
1753         uuid_hash = obd->obd_uuid_hash;
1754         cfs_hash_getref(uuid_hash);
1755         spin_unlock(&obd->obd_dev_lock);
1756
1757         obd_str2uuid(&doomed_uuid, uuid);
1758         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1759                 CERROR("%s: can't evict myself\n", obd->obd_name);
1760                 cfs_hash_putref(uuid_hash);
1761                 return exports_evicted;
1762         }
1763
1764         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1765
1766         if (doomed_exp == NULL) {
1767                 CERROR("%s: can't disconnect %s: no exports found\n",
1768                        obd->obd_name, uuid);
1769         } else {
1770                 CWARN("%s: evicting %s at adminstrative request\n",
1771                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1772                 class_fail_export(doomed_exp);
1773                 class_export_put(doomed_exp);
1774                 exports_evicted++;
1775         }
1776         cfs_hash_putref(uuid_hash);
1777
1778         return exports_evicted;
1779 }
1780
1781 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1782 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1783 EXPORT_SYMBOL(class_export_dump_hook);
1784 #endif
1785
1786 static void print_export_data(struct obd_export *exp, const char *status,
1787                               int locks, int debug_level)
1788 {
1789         struct ptlrpc_reply_state *rs;
1790         struct ptlrpc_reply_state *first_reply = NULL;
1791         int nreplies = 0;
1792
1793         spin_lock(&exp->exp_lock);
1794         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1795                             rs_exp_list) {
1796                 if (nreplies == 0)
1797                         first_reply = rs;
1798                 nreplies++;
1799         }
1800         spin_unlock(&exp->exp_lock);
1801
1802         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1803                "%p %s %llu stale:%d\n",
1804                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1805                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1806                atomic_read(&exp->exp_rpc_count),
1807                atomic_read(&exp->exp_cb_count),
1808                atomic_read(&exp->exp_locks_count),
1809                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1810                nreplies, first_reply, nreplies > 3 ? "..." : "",
1811                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1812 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1813         if (locks && class_export_dump_hook != NULL)
1814                 class_export_dump_hook(exp);
1815 #endif
1816 }
1817
1818 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1819 {
1820         struct obd_export *exp;
1821
1822         spin_lock(&obd->obd_dev_lock);
1823         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1824                 print_export_data(exp, "ACTIVE", locks, debug_level);
1825         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1826                 print_export_data(exp, "UNLINKED", locks, debug_level);
1827         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1828                 print_export_data(exp, "DELAYED", locks, debug_level);
1829         spin_unlock(&obd->obd_dev_lock);
1830 }
1831
1832 void obd_exports_barrier(struct obd_device *obd)
1833 {
1834         int waited = 2;
1835         LASSERT(list_empty(&obd->obd_exports));
1836         spin_lock(&obd->obd_dev_lock);
1837         while (!list_empty(&obd->obd_unlinked_exports)) {
1838                 spin_unlock(&obd->obd_dev_lock);
1839                 set_current_state(TASK_UNINTERRUPTIBLE);
1840                 schedule_timeout(cfs_time_seconds(waited));
1841                 if (waited > 5 && is_power_of_2(waited)) {
1842                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1843                                       "more than %d seconds. "
1844                                       "The obd refcount = %d. Is it stuck?\n",
1845                                       obd->obd_name, waited,
1846                                       atomic_read(&obd->obd_refcount));
1847                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1848                 }
1849                 waited *= 2;
1850                 spin_lock(&obd->obd_dev_lock);
1851         }
1852         spin_unlock(&obd->obd_dev_lock);
1853 }
1854 EXPORT_SYMBOL(obd_exports_barrier);
1855
1856 /**
1857  * Add export to the obd_zombe thread and notify it.
1858  */
1859 static void obd_zombie_export_add(struct obd_export *exp) {
1860         atomic_dec(&obd_stale_export_num);
1861         spin_lock(&exp->exp_obd->obd_dev_lock);
1862         LASSERT(!list_empty(&exp->exp_obd_chain));
1863         list_del_init(&exp->exp_obd_chain);
1864         spin_unlock(&exp->exp_obd->obd_dev_lock);
1865
1866         queue_work(zombie_wq, &exp->exp_zombie_work);
1867 }
1868
1869 /**
1870  * Add import to the obd_zombe thread and notify it.
1871  */
1872 static void obd_zombie_import_add(struct obd_import *imp) {
1873         LASSERT(imp->imp_sec == NULL);
1874
1875         queue_work(zombie_wq, &imp->imp_zombie_work);
1876 }
1877
1878 /**
1879  * wait when obd_zombie import/export queues become empty
1880  */
1881 void obd_zombie_barrier(void)
1882 {
1883         flush_workqueue(zombie_wq);
1884 }
1885 EXPORT_SYMBOL(obd_zombie_barrier);
1886
1887
1888 struct obd_export *obd_stale_export_get(void)
1889 {
1890         struct obd_export *exp = NULL;
1891         ENTRY;
1892
1893         spin_lock(&obd_stale_export_lock);
1894         if (!list_empty(&obd_stale_exports)) {
1895                 exp = list_entry(obd_stale_exports.next,
1896                                  struct obd_export, exp_stale_list);
1897                 list_del_init(&exp->exp_stale_list);
1898         }
1899         spin_unlock(&obd_stale_export_lock);
1900
1901         if (exp) {
1902                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1903                        atomic_read(&obd_stale_export_num));
1904         }
1905         RETURN(exp);
1906 }
1907 EXPORT_SYMBOL(obd_stale_export_get);
1908
1909 void obd_stale_export_put(struct obd_export *exp)
1910 {
1911         ENTRY;
1912
1913         LASSERT(list_empty(&exp->exp_stale_list));
1914         if (exp->exp_lock_hash &&
1915             atomic_read(&exp->exp_lock_hash->hs_count)) {
1916                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1917                        atomic_read(&obd_stale_export_num));
1918
1919                 spin_lock_bh(&exp->exp_bl_list_lock);
1920                 spin_lock(&obd_stale_export_lock);
1921                 /* Add to the tail if there is no blocked locks,
1922                  * to the head otherwise. */
1923                 if (list_empty(&exp->exp_bl_list))
1924                         list_add_tail(&exp->exp_stale_list,
1925                                       &obd_stale_exports);
1926                 else
1927                         list_add(&exp->exp_stale_list,
1928                                  &obd_stale_exports);
1929
1930                 spin_unlock(&obd_stale_export_lock);
1931                 spin_unlock_bh(&exp->exp_bl_list_lock);
1932         } else {
1933                 class_export_put(exp);
1934         }
1935         EXIT;
1936 }
1937 EXPORT_SYMBOL(obd_stale_export_put);
1938
1939 /**
1940  * Adjust the position of the export in the stale list,
1941  * i.e. move to the head of the list if is needed.
1942  **/
1943 void obd_stale_export_adjust(struct obd_export *exp)
1944 {
1945         LASSERT(exp != NULL);
1946         spin_lock_bh(&exp->exp_bl_list_lock);
1947         spin_lock(&obd_stale_export_lock);
1948
1949         if (!list_empty(&exp->exp_stale_list) &&
1950             !list_empty(&exp->exp_bl_list))
1951                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1952
1953         spin_unlock(&obd_stale_export_lock);
1954         spin_unlock_bh(&exp->exp_bl_list_lock);
1955 }
1956 EXPORT_SYMBOL(obd_stale_export_adjust);
1957
1958 /**
1959  * start destroy zombie import/export thread
1960  */
1961 int obd_zombie_impexp_init(void)
1962 {
1963         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1964         if (!zombie_wq)
1965                 return -ENOMEM;
1966
1967         return 0;
1968 }
1969
1970 /**
1971  * stop destroy zombie import/export thread
1972  */
1973 void obd_zombie_impexp_stop(void)
1974 {
1975         destroy_workqueue(zombie_wq);
1976         LASSERT(list_empty(&obd_stale_exports));
1977 }
1978
1979 /***** Kernel-userspace comm helpers *******/
1980
1981 /* Get length of entire message, including header */
1982 int kuc_len(int payload_len)
1983 {
1984         return sizeof(struct kuc_hdr) + payload_len;
1985 }
1986 EXPORT_SYMBOL(kuc_len);
1987
1988 /* Get a pointer to kuc header, given a ptr to the payload
1989  * @param p Pointer to payload area
1990  * @returns Pointer to kuc header
1991  */
1992 struct kuc_hdr * kuc_ptr(void *p)
1993 {
1994         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1995         LASSERT(lh->kuc_magic == KUC_MAGIC);
1996         return lh;
1997 }
1998 EXPORT_SYMBOL(kuc_ptr);
1999
2000 /* Alloc space for a message, and fill in header
2001  * @return Pointer to payload area
2002  */
2003 void *kuc_alloc(int payload_len, int transport, int type)
2004 {
2005         struct kuc_hdr *lh;
2006         int len = kuc_len(payload_len);
2007
2008         OBD_ALLOC(lh, len);
2009         if (lh == NULL)
2010                 return ERR_PTR(-ENOMEM);
2011
2012         lh->kuc_magic = KUC_MAGIC;
2013         lh->kuc_transport = transport;
2014         lh->kuc_msgtype = type;
2015         lh->kuc_msglen = len;
2016
2017         return (void *)(lh + 1);
2018 }
2019 EXPORT_SYMBOL(kuc_alloc);
2020
2021 /* Takes pointer to payload area */
2022 void kuc_free(void *p, int payload_len)
2023 {
2024         struct kuc_hdr *lh = kuc_ptr(p);
2025         OBD_FREE(lh, kuc_len(payload_len));
2026 }
2027 EXPORT_SYMBOL(kuc_free);
2028
2029 struct obd_request_slot_waiter {
2030         struct list_head        orsw_entry;
2031         wait_queue_head_t       orsw_waitq;
2032         bool                    orsw_signaled;
2033 };
2034
2035 static bool obd_request_slot_avail(struct client_obd *cli,
2036                                    struct obd_request_slot_waiter *orsw)
2037 {
2038         bool avail;
2039
2040         spin_lock(&cli->cl_loi_list_lock);
2041         avail = !!list_empty(&orsw->orsw_entry);
2042         spin_unlock(&cli->cl_loi_list_lock);
2043
2044         return avail;
2045 };
2046
2047 /*
2048  * For network flow control, the RPC sponsor needs to acquire a credit
2049  * before sending the RPC. The credits count for a connection is defined
2050  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2051  * the subsequent RPC sponsors need to wait until others released their
2052  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2053  */
2054 int obd_get_request_slot(struct client_obd *cli)
2055 {
2056         struct obd_request_slot_waiter   orsw;
2057         struct l_wait_info               lwi;
2058         int                              rc;
2059
2060         spin_lock(&cli->cl_loi_list_lock);
2061         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2062                 cli->cl_rpcs_in_flight++;
2063                 spin_unlock(&cli->cl_loi_list_lock);
2064                 return 0;
2065         }
2066
2067         init_waitqueue_head(&orsw.orsw_waitq);
2068         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2069         orsw.orsw_signaled = false;
2070         spin_unlock(&cli->cl_loi_list_lock);
2071
2072         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2073         rc = l_wait_event(orsw.orsw_waitq,
2074                           obd_request_slot_avail(cli, &orsw) ||
2075                           orsw.orsw_signaled,
2076                           &lwi);
2077
2078         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2079          * freed but other (such as obd_put_request_slot) is using it. */
2080         spin_lock(&cli->cl_loi_list_lock);
2081         if (rc != 0) {
2082                 if (!orsw.orsw_signaled) {
2083                         if (list_empty(&orsw.orsw_entry))
2084                                 cli->cl_rpcs_in_flight--;
2085                         else
2086                                 list_del(&orsw.orsw_entry);
2087                 }
2088         }
2089
2090         if (orsw.orsw_signaled) {
2091                 LASSERT(list_empty(&orsw.orsw_entry));
2092
2093                 rc = -EINTR;
2094         }
2095         spin_unlock(&cli->cl_loi_list_lock);
2096
2097         return rc;
2098 }
2099 EXPORT_SYMBOL(obd_get_request_slot);
2100
2101 void obd_put_request_slot(struct client_obd *cli)
2102 {
2103         struct obd_request_slot_waiter *orsw;
2104
2105         spin_lock(&cli->cl_loi_list_lock);
2106         cli->cl_rpcs_in_flight--;
2107
2108         /* If there is free slot, wakeup the first waiter. */
2109         if (!list_empty(&cli->cl_flight_waiters) &&
2110             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2111                 orsw = list_entry(cli->cl_flight_waiters.next,
2112                                   struct obd_request_slot_waiter, orsw_entry);
2113                 list_del_init(&orsw->orsw_entry);
2114                 cli->cl_rpcs_in_flight++;
2115                 wake_up(&orsw->orsw_waitq);
2116         }
2117         spin_unlock(&cli->cl_loi_list_lock);
2118 }
2119 EXPORT_SYMBOL(obd_put_request_slot);
2120
2121 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2122 {
2123         return cli->cl_max_rpcs_in_flight;
2124 }
2125 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2126
2127 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2128 {
2129         struct obd_request_slot_waiter *orsw;
2130         __u32                           old;
2131         int                             diff;
2132         int                             i;
2133         const char *type_name;
2134         int                             rc;
2135
2136         if (max > OBD_MAX_RIF_MAX || max < 1)
2137                 return -ERANGE;
2138
2139         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2140         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2141                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2142                  * strictly lower that max_rpcs_in_flight */
2143                 if (max < 2) {
2144                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2145                                "because it must be higher than "
2146                                "max_mod_rpcs_in_flight value",
2147                                cli->cl_import->imp_obd->obd_name);
2148                         return -ERANGE;
2149                 }
2150                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2151                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2152                         if (rc != 0)
2153                                 return rc;
2154                 }
2155         }
2156
2157         spin_lock(&cli->cl_loi_list_lock);
2158         old = cli->cl_max_rpcs_in_flight;
2159         cli->cl_max_rpcs_in_flight = max;
2160         client_adjust_max_dirty(cli);
2161
2162         diff = max - old;
2163
2164         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2165         for (i = 0; i < diff; i++) {
2166                 if (list_empty(&cli->cl_flight_waiters))
2167                         break;
2168
2169                 orsw = list_entry(cli->cl_flight_waiters.next,
2170                                   struct obd_request_slot_waiter, orsw_entry);
2171                 list_del_init(&orsw->orsw_entry);
2172                 cli->cl_rpcs_in_flight++;
2173                 wake_up(&orsw->orsw_waitq);
2174         }
2175         spin_unlock(&cli->cl_loi_list_lock);
2176
2177         return 0;
2178 }
2179 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2180
2181 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2182 {
2183         return cli->cl_max_mod_rpcs_in_flight;
2184 }
2185 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2186
2187 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2188 {
2189         struct obd_connect_data *ocd;
2190         __u16 maxmodrpcs;
2191         __u16 prev;
2192
2193         if (max > OBD_MAX_RIF_MAX || max < 1)
2194                 return -ERANGE;
2195
2196         /* cannot exceed or equal max_rpcs_in_flight */
2197         if (max >= cli->cl_max_rpcs_in_flight) {
2198                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2199                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2200                        cli->cl_import->imp_obd->obd_name,
2201                        max, cli->cl_max_rpcs_in_flight);
2202                 return -ERANGE;
2203         }
2204
2205         /* cannot exceed max modify RPCs in flight supported by the server */
2206         ocd = &cli->cl_import->imp_connect_data;
2207         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2208                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2209         else
2210                 maxmodrpcs = 1;
2211         if (max > maxmodrpcs) {
2212                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2213                        "higher than max_mod_rpcs_per_client value (%hu) "
2214                        "returned by the server at connection\n",
2215                        cli->cl_import->imp_obd->obd_name,
2216                        max, maxmodrpcs);
2217                 return -ERANGE;
2218         }
2219
2220         spin_lock(&cli->cl_mod_rpcs_lock);
2221
2222         prev = cli->cl_max_mod_rpcs_in_flight;
2223         cli->cl_max_mod_rpcs_in_flight = max;
2224
2225         /* wakeup waiters if limit has been increased */
2226         if (cli->cl_max_mod_rpcs_in_flight > prev)
2227                 wake_up(&cli->cl_mod_rpcs_waitq);
2228
2229         spin_unlock(&cli->cl_mod_rpcs_lock);
2230
2231         return 0;
2232 }
2233 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2234
2235 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2236                                struct seq_file *seq)
2237 {
2238         unsigned long mod_tot = 0, mod_cum;
2239         struct timespec64 now;
2240         int i;
2241
2242         ktime_get_real_ts64(&now);
2243
2244         spin_lock(&cli->cl_mod_rpcs_lock);
2245
2246         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2247                    (s64)now.tv_sec, now.tv_nsec);
2248         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2249                    cli->cl_mod_rpcs_in_flight);
2250
2251         seq_printf(seq, "\n\t\t\tmodify\n");
2252         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2253
2254         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2255
2256         mod_cum = 0;
2257         for (i = 0; i < OBD_HIST_MAX; i++) {
2258                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2259                 mod_cum += mod;
2260                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2261                            i, mod, pct(mod, mod_tot),
2262                            pct(mod_cum, mod_tot));
2263                 if (mod_cum == mod_tot)
2264                         break;
2265         }
2266
2267         spin_unlock(&cli->cl_mod_rpcs_lock);
2268
2269         return 0;
2270 }
2271 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2272
2273 /* The number of modify RPCs sent in parallel is limited
2274  * because the server has a finite number of slots per client to
2275  * store request result and ensure reply reconstruction when needed.
2276  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2277  * that takes into account server limit and cl_max_rpcs_in_flight
2278  * value.
2279  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2280  * one close request is allowed above the maximum.
2281  */
2282 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2283                                                  bool close_req)
2284 {
2285         bool avail;
2286
2287         /* A slot is available if
2288          * - number of modify RPCs in flight is less than the max
2289          * - it's a close RPC and no other close request is in flight
2290          */
2291         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2292                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2293
2294         return avail;
2295 }
2296
2297 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2298                                          bool close_req)
2299 {
2300         bool avail;
2301
2302         spin_lock(&cli->cl_mod_rpcs_lock);
2303         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2304         spin_unlock(&cli->cl_mod_rpcs_lock);
2305         return avail;
2306 }
2307
2308 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2309 {
2310         if (it != NULL &&
2311             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2312              it->it_op == IT_READDIR ||
2313              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2314                         return true;
2315         return false;
2316 }
2317
2318 /* Get a modify RPC slot from the obd client @cli according
2319  * to the kind of operation @opc that is going to be sent
2320  * and the intent @it of the operation if it applies.
2321  * If the maximum number of modify RPCs in flight is reached
2322  * the thread is put to sleep.
2323  * Returns the tag to be set in the request message. Tag 0
2324  * is reserved for non-modifying requests.
2325  */
2326 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2327                            struct lookup_intent *it)
2328 {
2329         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2330         bool                    close_req = false;
2331         __u16                   i, max;
2332
2333         /* read-only metadata RPCs don't consume a slot on MDT
2334          * for reply reconstruction
2335          */
2336         if (obd_skip_mod_rpc_slot(it))
2337                 return 0;
2338
2339         if (opc == MDS_CLOSE)
2340                 close_req = true;
2341
2342         do {
2343                 spin_lock(&cli->cl_mod_rpcs_lock);
2344                 max = cli->cl_max_mod_rpcs_in_flight;
2345                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2346                         /* there is a slot available */
2347                         cli->cl_mod_rpcs_in_flight++;
2348                         if (close_req)
2349                                 cli->cl_close_rpcs_in_flight++;
2350                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2351                                          cli->cl_mod_rpcs_in_flight);
2352                         /* find a free tag */
2353                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2354                                                 max + 1);
2355                         LASSERT(i < OBD_MAX_RIF_MAX);
2356                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2357                         spin_unlock(&cli->cl_mod_rpcs_lock);
2358                         /* tag 0 is reserved for non-modify RPCs */
2359
2360                         CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2361                                "opc %u, max %hu\n",
2362                                cli->cl_import->imp_obd->obd_name,
2363                                i + 1, opc, max);
2364
2365                         return i + 1;
2366                 }
2367                 spin_unlock(&cli->cl_mod_rpcs_lock);
2368
2369                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2370                        "opc %u, max %hu\n",
2371                        cli->cl_import->imp_obd->obd_name, opc, max);
2372
2373                 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2374                                        obd_mod_rpc_slot_avail(cli, close_req),
2375                                        &lwi);
2376         } while (true);
2377 }
2378 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2379
2380 /* Put a modify RPC slot from the obd client @cli according
2381  * to the kind of operation @opc that has been sent and the
2382  * intent @it of the operation if it applies.
2383  */
2384 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2385                           struct lookup_intent *it, __u16 tag)
2386 {
2387         bool                    close_req = false;
2388
2389         if (obd_skip_mod_rpc_slot(it))
2390                 return;
2391
2392         if (opc == MDS_CLOSE)
2393                 close_req = true;
2394
2395         spin_lock(&cli->cl_mod_rpcs_lock);
2396         cli->cl_mod_rpcs_in_flight--;
2397         if (close_req)
2398                 cli->cl_close_rpcs_in_flight--;
2399         /* release the tag in the bitmap */
2400         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2401         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2402         spin_unlock(&cli->cl_mod_rpcs_lock);
2403         wake_up(&cli->cl_mod_rpcs_waitq);
2404 }
2405 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2406