Whamcloud - gitweb
c5404d85b5467ecc28db8893f5a5733cc9173553
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53
54 static struct kmem_cache *obd_device_cachep;
55
56 static struct workqueue_struct *zombie_wq;
57
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks, int debug_level);
62
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         struct list_head *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         list_for_each(tmp, &obd_types) {
106                 type = list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165 #ifdef HAVE_SERVER_SUPPORT
166         if (type->typ_sym_filter)
167                 type->typ_debugfs_entry = NULL;
168 #endif
169         debugfs_remove_recursive(type->typ_debugfs_entry);
170         type->typ_debugfs_entry = NULL;
171
172         if (type->typ_lu)
173                 lu_device_type_fini(type->typ_lu);
174
175         spin_lock(&obd_types_lock);
176         list_del(&type->typ_chain);
177         spin_unlock(&obd_types_lock);
178
179         if (type->typ_name) {
180 #ifdef CONFIG_PROC_FS
181                 if (type->typ_procroot)
182                         remove_proc_subtree(type->typ_name, proc_lustre_root);
183 #endif
184                 OBD_FREE(type->typ_name, strlen(type->typ_name) + 1);
185         }
186         if (type->typ_md_ops)
187                 OBD_FREE_PTR(type->typ_md_ops);
188         if (type->typ_dt_ops)
189                 OBD_FREE_PTR(type->typ_dt_ops);
190
191         OBD_FREE(type, sizeof(*type));
192 }
193
194 static struct kobj_type class_ktype = {
195         .sysfs_ops      = &lustre_sysfs_ops,
196         .release        = class_sysfs_release,
197 };
198
199 #ifdef HAVE_SERVER_SUPPORT
200 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
201 {
202         struct dentry *symlink;
203         struct obd_type *type;
204         struct kobject *kobj;
205         int rc;
206
207         kobj = kset_find_obj(lustre_kset, name);
208         if (kobj) {
209                 kobject_put(kobj);
210                 return ERR_PTR(-EEXIST);
211         }
212
213         OBD_ALLOC(type, sizeof(*type));
214         if (!type)
215                 return ERR_PTR(-ENOMEM);
216
217         INIT_LIST_HEAD(&type->typ_chain);
218
219         type->typ_kobj.kset = lustre_kset;
220         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
221                                   &lustre_kset->kobj, "%s", name);
222         if (rc)
223                 return ERR_PTR(rc);
224
225         symlink = debugfs_create_dir(name, debugfs_lustre_root);
226         if (IS_ERR_OR_NULL(symlink)) {
227                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
228                 kobject_put(&type->typ_kobj);
229                 return ERR_PTR(rc);
230         }
231         type->typ_debugfs_entry = symlink;
232         type->typ_sym_filter = true;
233
234         if (enable_proc) {
235                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
236                                                       NULL, NULL);
237                 if (IS_ERR(type->typ_procroot)) {
238                         CERROR("%s: can't create compat proc entry: %d\n",
239                                name, (int)PTR_ERR(type->typ_procroot));
240                         type->typ_procroot = NULL;
241                 }
242         }
243
244         return type;
245 }
246 EXPORT_SYMBOL(class_add_symlinks);
247 #endif /* HAVE_SERVER_SUPPORT */
248
249 #define CLASS_MAX_NAME 1024
250
251 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
252                         bool enable_proc, struct lprocfs_vars *vars,
253                         const char *name, struct lu_device_type *ldt)
254 {
255         struct obd_type *type;
256         int rc;
257
258         ENTRY;
259         /* sanity check */
260         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
261
262         if (class_search_type(name)) {
263 #ifdef HAVE_SERVER_SUPPORT
264                 if (strcmp(name, LUSTRE_LOV_NAME) == 0 ||
265                     strcmp(name, LUSTRE_OSC_NAME) == 0) {
266                         struct kobject *kobj;
267
268                         kobj = kset_find_obj(lustre_kset, name);
269                         if (kobj) {
270                                 type = container_of(kobj, struct obd_type,
271                                                     typ_kobj);
272                                 goto dir_exist;
273                         }
274                 }
275 #endif /* HAVE_SERVER_SUPPORT */
276                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
277                 RETURN(-EEXIST);
278         }
279
280         OBD_ALLOC(type, sizeof(*type));
281         if (type == NULL)
282                 RETURN(-ENOMEM);
283
284         INIT_LIST_HEAD(&type->typ_chain);
285         type->typ_kobj.kset = lustre_kset;
286         kobject_init(&type->typ_kobj, &class_ktype);
287 #ifdef HAVE_SERVER_SUPPORT
288 dir_exist:
289 #endif /* HAVE_SERVER_SUPPORT */
290         OBD_ALLOC_PTR(type->typ_dt_ops);
291         OBD_ALLOC_PTR(type->typ_md_ops);
292         OBD_ALLOC(type->typ_name, strlen(name) + 1);
293
294         if (type->typ_dt_ops == NULL ||
295             type->typ_md_ops == NULL ||
296             type->typ_name == NULL)
297                 GOTO (failed, rc = -ENOMEM);
298
299         *(type->typ_dt_ops) = *dt_ops;
300         /* md_ops is optional */
301         if (md_ops)
302                 *(type->typ_md_ops) = *md_ops;
303         strcpy(type->typ_name, name);
304         spin_lock_init(&type->obd_type_lock);
305
306 #ifdef HAVE_SERVER_SUPPORT
307         if (type->typ_sym_filter)
308                 goto setup_ldt;
309 #endif
310 #ifdef CONFIG_PROC_FS
311         if (enable_proc && !type->typ_procroot) {
312                 type->typ_procroot = lprocfs_register(type->typ_name,
313                                                       proc_lustre_root,
314                                                       NULL, type);
315                 if (IS_ERR(type->typ_procroot)) {
316                         rc = PTR_ERR(type->typ_procroot);
317                         type->typ_procroot = NULL;
318                         GOTO(failed, rc);
319                 }
320         }
321 #endif
322         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
323                                                     vars, type);
324         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
325                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
326                                              : -ENOMEM;
327                 type->typ_debugfs_entry = NULL;
328                 GOTO(failed, rc);
329         }
330
331         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
332         if (rc)
333                 GOTO(failed, rc);
334 #ifdef HAVE_SERVER_SUPPORT
335 setup_ldt:
336 #endif
337         if (ldt) {
338                 type->typ_lu = ldt;
339                 rc = lu_device_type_init(ldt);
340                 if (rc)
341                         GOTO(failed, rc);
342         }
343
344         spin_lock(&obd_types_lock);
345         list_add(&type->typ_chain, &obd_types);
346         spin_unlock(&obd_types_lock);
347
348         RETURN(0);
349
350 failed:
351         kobject_put(&type->typ_kobj);
352
353         RETURN(rc);
354 }
355 EXPORT_SYMBOL(class_register_type);
356
357 int class_unregister_type(const char *name)
358 {
359         struct obd_type *type = class_search_type(name);
360         ENTRY;
361
362         if (!type) {
363                 CERROR("unknown obd type\n");
364                 RETURN(-EINVAL);
365         }
366
367         if (type->typ_refcnt) {
368                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
369                 /* This is a bad situation, let's make the best of it */
370                 /* Remove ops, but leave the name for debugging */
371                 OBD_FREE_PTR(type->typ_dt_ops);
372                 OBD_FREE_PTR(type->typ_md_ops);
373                 RETURN(-EBUSY);
374         }
375
376         kobject_put(&type->typ_kobj);
377
378         RETURN(0);
379 } /* class_unregister_type */
380 EXPORT_SYMBOL(class_unregister_type);
381
382 /**
383  * Create a new obd device.
384  *
385  * Allocate the new obd_device and initialize it.
386  *
387  * \param[in] type_name obd device type string.
388  * \param[in] name      obd device name.
389  * \param[in] uuid      obd device UUID
390  *
391  * \retval newdev         pointer to created obd_device
392  * \retval ERR_PTR(errno) on error
393  */
394 struct obd_device *class_newdev(const char *type_name, const char *name,
395                                 const char *uuid)
396 {
397         struct obd_device *newdev;
398         struct obd_type *type = NULL;
399         ENTRY;
400
401         if (strlen(name) >= MAX_OBD_NAME) {
402                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
403                 RETURN(ERR_PTR(-EINVAL));
404         }
405
406         type = class_get_type(type_name);
407         if (type == NULL){
408                 CERROR("OBD: unknown type: %s\n", type_name);
409                 RETURN(ERR_PTR(-ENODEV));
410         }
411
412         newdev = obd_device_alloc();
413         if (newdev == NULL) {
414                 class_put_type(type);
415                 RETURN(ERR_PTR(-ENOMEM));
416         }
417         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
418         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
419         newdev->obd_type = type;
420         newdev->obd_minor = -1;
421
422         rwlock_init(&newdev->obd_pool_lock);
423         newdev->obd_pool_limit = 0;
424         newdev->obd_pool_slv = 0;
425
426         INIT_LIST_HEAD(&newdev->obd_exports);
427         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
428         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
429         INIT_LIST_HEAD(&newdev->obd_exports_timed);
430         INIT_LIST_HEAD(&newdev->obd_nid_stats);
431         spin_lock_init(&newdev->obd_nid_lock);
432         spin_lock_init(&newdev->obd_dev_lock);
433         mutex_init(&newdev->obd_dev_mutex);
434         spin_lock_init(&newdev->obd_osfs_lock);
435         /* newdev->obd_osfs_age must be set to a value in the distant
436          * past to guarantee a fresh statfs is fetched on mount. */
437         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
438
439         /* XXX belongs in setup not attach  */
440         init_rwsem(&newdev->obd_observer_link_sem);
441         /* recovery data */
442         spin_lock_init(&newdev->obd_recovery_task_lock);
443         init_waitqueue_head(&newdev->obd_next_transno_waitq);
444         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
445         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
446         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
447         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
448         INIT_LIST_HEAD(&newdev->obd_evict_list);
449         INIT_LIST_HEAD(&newdev->obd_lwp_list);
450
451         llog_group_init(&newdev->obd_olg);
452         /* Detach drops this */
453         atomic_set(&newdev->obd_refcount, 1);
454         lu_ref_init(&newdev->obd_reference);
455         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
456
457         newdev->obd_conn_inprogress = 0;
458
459         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
460
461         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
462                newdev->obd_name, newdev);
463
464         return newdev;
465 }
466
467 /**
468  * Free obd device.
469  *
470  * \param[in] obd obd_device to be freed
471  *
472  * \retval none
473  */
474 void class_free_dev(struct obd_device *obd)
475 {
476         struct obd_type *obd_type = obd->obd_type;
477
478         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
479                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
480         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
481                  "obd %p != obd_devs[%d] %p\n",
482                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
483         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
484                  "obd_refcount should be 0, not %d\n",
485                  atomic_read(&obd->obd_refcount));
486         LASSERT(obd_type != NULL);
487
488         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
489                obd->obd_name, obd->obd_type->typ_name);
490
491         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
492                          obd->obd_name, obd->obd_uuid.uuid);
493         if (obd->obd_stopping) {
494                 int err;
495
496                 /* If we're not stopping, we were never set up */
497                 err = obd_cleanup(obd);
498                 if (err)
499                         CERROR("Cleanup %s returned %d\n",
500                                 obd->obd_name, err);
501         }
502
503         obd_device_free(obd);
504
505         class_put_type(obd_type);
506 }
507
508 /**
509  * Unregister obd device.
510  *
511  * Free slot in obd_dev[] used by \a obd.
512  *
513  * \param[in] new_obd obd_device to be unregistered
514  *
515  * \retval none
516  */
517 void class_unregister_device(struct obd_device *obd)
518 {
519         write_lock(&obd_dev_lock);
520         if (obd->obd_minor >= 0) {
521                 LASSERT(obd_devs[obd->obd_minor] == obd);
522                 obd_devs[obd->obd_minor] = NULL;
523                 obd->obd_minor = -1;
524         }
525         write_unlock(&obd_dev_lock);
526 }
527
528 /**
529  * Register obd device.
530  *
531  * Find free slot in obd_devs[], fills it with \a new_obd.
532  *
533  * \param[in] new_obd obd_device to be registered
534  *
535  * \retval 0          success
536  * \retval -EEXIST    device with this name is registered
537  * \retval -EOVERFLOW obd_devs[] is full
538  */
539 int class_register_device(struct obd_device *new_obd)
540 {
541         int ret = 0;
542         int i;
543         int new_obd_minor = 0;
544         bool minor_assign = false;
545         bool retried = false;
546
547 again:
548         write_lock(&obd_dev_lock);
549         for (i = 0; i < class_devno_max(); i++) {
550                 struct obd_device *obd = class_num2obd(i);
551
552                 if (obd != NULL &&
553                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
554
555                         if (!retried) {
556                                 write_unlock(&obd_dev_lock);
557
558                                 /* the obd_device could be waited to be
559                                  * destroyed by the "obd_zombie_impexp_thread".
560                                  */
561                                 obd_zombie_barrier();
562                                 retried = true;
563                                 goto again;
564                         }
565
566                         CERROR("%s: already exists, won't add\n",
567                                obd->obd_name);
568                         /* in case we found a free slot before duplicate */
569                         minor_assign = false;
570                         ret = -EEXIST;
571                         break;
572                 }
573                 if (!minor_assign && obd == NULL) {
574                         new_obd_minor = i;
575                         minor_assign = true;
576                 }
577         }
578
579         if (minor_assign) {
580                 new_obd->obd_minor = new_obd_minor;
581                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
582                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
583                 obd_devs[new_obd_minor] = new_obd;
584         } else {
585                 if (ret == 0) {
586                         ret = -EOVERFLOW;
587                         CERROR("%s: all %u/%u devices used, increase "
588                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
589                                i, class_devno_max(), ret);
590                 }
591         }
592         write_unlock(&obd_dev_lock);
593
594         RETURN(ret);
595 }
596
597 static int class_name2dev_nolock(const char *name)
598 {
599         int i;
600
601         if (!name)
602                 return -1;
603
604         for (i = 0; i < class_devno_max(); i++) {
605                 struct obd_device *obd = class_num2obd(i);
606
607                 if (obd && strcmp(name, obd->obd_name) == 0) {
608                         /* Make sure we finished attaching before we give
609                            out any references */
610                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
611                         if (obd->obd_attached) {
612                                 return i;
613                         }
614                         break;
615                 }
616         }
617
618         return -1;
619 }
620
621 int class_name2dev(const char *name)
622 {
623         int i;
624
625         if (!name)
626                 return -1;
627
628         read_lock(&obd_dev_lock);
629         i = class_name2dev_nolock(name);
630         read_unlock(&obd_dev_lock);
631
632         return i;
633 }
634 EXPORT_SYMBOL(class_name2dev);
635
636 struct obd_device *class_name2obd(const char *name)
637 {
638         int dev = class_name2dev(name);
639
640         if (dev < 0 || dev > class_devno_max())
641                 return NULL;
642         return class_num2obd(dev);
643 }
644 EXPORT_SYMBOL(class_name2obd);
645
646 int class_uuid2dev_nolock(struct obd_uuid *uuid)
647 {
648         int i;
649
650         for (i = 0; i < class_devno_max(); i++) {
651                 struct obd_device *obd = class_num2obd(i);
652
653                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
654                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
655                         return i;
656                 }
657         }
658
659         return -1;
660 }
661
662 int class_uuid2dev(struct obd_uuid *uuid)
663 {
664         int i;
665
666         read_lock(&obd_dev_lock);
667         i = class_uuid2dev_nolock(uuid);
668         read_unlock(&obd_dev_lock);
669
670         return i;
671 }
672 EXPORT_SYMBOL(class_uuid2dev);
673
674 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
675 {
676         int dev = class_uuid2dev(uuid);
677         if (dev < 0)
678                 return NULL;
679         return class_num2obd(dev);
680 }
681 EXPORT_SYMBOL(class_uuid2obd);
682
683 /**
684  * Get obd device from ::obd_devs[]
685  *
686  * \param num [in] array index
687  *
688  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
689  *         otherwise return the obd device there.
690  */
691 struct obd_device *class_num2obd(int num)
692 {
693         struct obd_device *obd = NULL;
694
695         if (num < class_devno_max()) {
696                 obd = obd_devs[num];
697                 if (obd == NULL)
698                         return NULL;
699
700                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
701                          "%p obd_magic %08x != %08x\n",
702                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
703                 LASSERTF(obd->obd_minor == num,
704                          "%p obd_minor %0d != %0d\n",
705                          obd, obd->obd_minor, num);
706         }
707
708         return obd;
709 }
710
711 /**
712  * Find obd in obd_dev[] by name or uuid.
713  *
714  * Increment obd's refcount if found.
715  *
716  * \param[in] str obd name or uuid
717  *
718  * \retval NULL    if not found
719  * \retval target  pointer to found obd_device
720  */
721 struct obd_device *class_dev_by_str(const char *str)
722 {
723         struct obd_device *target = NULL;
724         struct obd_uuid tgtuuid;
725         int rc;
726
727         obd_str2uuid(&tgtuuid, str);
728
729         read_lock(&obd_dev_lock);
730         rc = class_uuid2dev_nolock(&tgtuuid);
731         if (rc < 0)
732                 rc = class_name2dev_nolock(str);
733
734         if (rc >= 0)
735                 target = class_num2obd(rc);
736
737         if (target != NULL)
738                 class_incref(target, "find", current);
739         read_unlock(&obd_dev_lock);
740
741         RETURN(target);
742 }
743 EXPORT_SYMBOL(class_dev_by_str);
744
745 /**
746  * Get obd devices count. Device in any
747  *    state are counted
748  * \retval obd device count
749  */
750 int get_devices_count(void)
751 {
752         int index, max_index = class_devno_max(), dev_count = 0;
753
754         read_lock(&obd_dev_lock);
755         for (index = 0; index <= max_index; index++) {
756                 struct obd_device *obd = class_num2obd(index);
757                 if (obd != NULL)
758                         dev_count++;
759         }
760         read_unlock(&obd_dev_lock);
761
762         return dev_count;
763 }
764 EXPORT_SYMBOL(get_devices_count);
765
766 void class_obd_list(void)
767 {
768         char *status;
769         int i;
770
771         read_lock(&obd_dev_lock);
772         for (i = 0; i < class_devno_max(); i++) {
773                 struct obd_device *obd = class_num2obd(i);
774
775                 if (obd == NULL)
776                         continue;
777                 if (obd->obd_stopping)
778                         status = "ST";
779                 else if (obd->obd_set_up)
780                         status = "UP";
781                 else if (obd->obd_attached)
782                         status = "AT";
783                 else
784                         status = "--";
785                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
786                          i, status, obd->obd_type->typ_name,
787                          obd->obd_name, obd->obd_uuid.uuid,
788                          atomic_read(&obd->obd_refcount));
789         }
790         read_unlock(&obd_dev_lock);
791         return;
792 }
793
794 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
795    specified, then only the client with that uuid is returned,
796    otherwise any client connected to the tgt is returned. */
797 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
798                                           const char * typ_name,
799                                           struct obd_uuid *grp_uuid)
800 {
801         int i;
802
803         read_lock(&obd_dev_lock);
804         for (i = 0; i < class_devno_max(); i++) {
805                 struct obd_device *obd = class_num2obd(i);
806
807                 if (obd == NULL)
808                         continue;
809                 if ((strncmp(obd->obd_type->typ_name, typ_name,
810                              strlen(typ_name)) == 0)) {
811                         if (obd_uuid_equals(tgt_uuid,
812                                             &obd->u.cli.cl_target_uuid) &&
813                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
814                                                          &obd->obd_uuid) : 1)) {
815                                 read_unlock(&obd_dev_lock);
816                                 return obd;
817                         }
818                 }
819         }
820         read_unlock(&obd_dev_lock);
821
822         return NULL;
823 }
824 EXPORT_SYMBOL(class_find_client_obd);
825
826 /* Iterate the obd_device list looking devices have grp_uuid. Start
827    searching at *next, and if a device is found, the next index to look
828    at is saved in *next. If next is NULL, then the first matching device
829    will always be returned. */
830 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
831 {
832         int i;
833
834         if (next == NULL)
835                 i = 0;
836         else if (*next >= 0 && *next < class_devno_max())
837                 i = *next;
838         else
839                 return NULL;
840
841         read_lock(&obd_dev_lock);
842         for (; i < class_devno_max(); i++) {
843                 struct obd_device *obd = class_num2obd(i);
844
845                 if (obd == NULL)
846                         continue;
847                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
848                         if (next != NULL)
849                                 *next = i+1;
850                         read_unlock(&obd_dev_lock);
851                         return obd;
852                 }
853         }
854         read_unlock(&obd_dev_lock);
855
856         return NULL;
857 }
858 EXPORT_SYMBOL(class_devices_in_group);
859
860 /**
861  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
862  * adjust sptlrpc settings accordingly.
863  */
864 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
865 {
866         struct obd_device  *obd;
867         const char         *type;
868         int                 i, rc = 0, rc2;
869
870         LASSERT(namelen > 0);
871
872         read_lock(&obd_dev_lock);
873         for (i = 0; i < class_devno_max(); i++) {
874                 obd = class_num2obd(i);
875
876                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
877                         continue;
878
879                 /* only notify mdc, osc, osp, lwp, mdt, ost
880                  * because only these have a -sptlrpc llog */
881                 type = obd->obd_type->typ_name;
882                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
883                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
884                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
885                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
886                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
887                     strcmp(type, LUSTRE_OST_NAME) != 0)
888                         continue;
889
890                 if (strncmp(obd->obd_name, fsname, namelen))
891                         continue;
892
893                 class_incref(obd, __FUNCTION__, obd);
894                 read_unlock(&obd_dev_lock);
895                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
896                                          sizeof(KEY_SPTLRPC_CONF),
897                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
898                 rc = rc ? rc : rc2;
899                 class_decref(obd, __FUNCTION__, obd);
900                 read_lock(&obd_dev_lock);
901         }
902         read_unlock(&obd_dev_lock);
903         return rc;
904 }
905 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
906
907 void obd_cleanup_caches(void)
908 {
909         ENTRY;
910         if (obd_device_cachep) {
911                 kmem_cache_destroy(obd_device_cachep);
912                 obd_device_cachep = NULL;
913         }
914
915         EXIT;
916 }
917
918 int obd_init_caches(void)
919 {
920         int rc;
921         ENTRY;
922
923         LASSERT(obd_device_cachep == NULL);
924         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
925                                               sizeof(struct obd_device),
926                                               0, 0, NULL);
927         if (!obd_device_cachep)
928                 GOTO(out, rc = -ENOMEM);
929
930         RETURN(0);
931 out:
932         obd_cleanup_caches();
933         RETURN(rc);
934 }
935
936 /* map connection to client */
937 struct obd_export *class_conn2export(struct lustre_handle *conn)
938 {
939         struct obd_export *export;
940         ENTRY;
941
942         if (!conn) {
943                 CDEBUG(D_CACHE, "looking for null handle\n");
944                 RETURN(NULL);
945         }
946
947         if (conn->cookie == -1) {  /* this means assign a new connection */
948                 CDEBUG(D_CACHE, "want a new connection\n");
949                 RETURN(NULL);
950         }
951
952         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
953         export = class_handle2object(conn->cookie, NULL);
954         RETURN(export);
955 }
956 EXPORT_SYMBOL(class_conn2export);
957
958 struct obd_device *class_exp2obd(struct obd_export *exp)
959 {
960         if (exp)
961                 return exp->exp_obd;
962         return NULL;
963 }
964 EXPORT_SYMBOL(class_exp2obd);
965
966 struct obd_import *class_exp2cliimp(struct obd_export *exp)
967 {
968         struct obd_device *obd = exp->exp_obd;
969         if (obd == NULL)
970                 return NULL;
971         return obd->u.cli.cl_import;
972 }
973 EXPORT_SYMBOL(class_exp2cliimp);
974
975 /* Export management functions */
976 static void class_export_destroy(struct obd_export *exp)
977 {
978         struct obd_device *obd = exp->exp_obd;
979         ENTRY;
980
981         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
982         LASSERT(obd != NULL);
983
984         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
985                exp->exp_client_uuid.uuid, obd->obd_name);
986
987         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
988         if (exp->exp_connection)
989                 ptlrpc_put_connection_superhack(exp->exp_connection);
990
991         LASSERT(list_empty(&exp->exp_outstanding_replies));
992         LASSERT(list_empty(&exp->exp_uncommitted_replies));
993         LASSERT(list_empty(&exp->exp_req_replay_queue));
994         LASSERT(list_empty(&exp->exp_hp_rpcs));
995         obd_destroy_export(exp);
996         /* self export doesn't hold a reference to an obd, although it
997          * exists until freeing of the obd */
998         if (exp != obd->obd_self_export)
999                 class_decref(obd, "export", exp);
1000
1001         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1002         EXIT;
1003 }
1004
1005 static void export_handle_addref(void *export)
1006 {
1007         class_export_get(export);
1008 }
1009
1010 static struct portals_handle_ops export_handle_ops = {
1011         .hop_addref = export_handle_addref,
1012         .hop_free   = NULL,
1013 };
1014
1015 struct obd_export *class_export_get(struct obd_export *exp)
1016 {
1017         atomic_inc(&exp->exp_refcount);
1018         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1019                atomic_read(&exp->exp_refcount));
1020         return exp;
1021 }
1022 EXPORT_SYMBOL(class_export_get);
1023
1024 void class_export_put(struct obd_export *exp)
1025 {
1026         LASSERT(exp != NULL);
1027         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1028         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1029                atomic_read(&exp->exp_refcount) - 1);
1030
1031         if (atomic_dec_and_test(&exp->exp_refcount)) {
1032                 struct obd_device *obd = exp->exp_obd;
1033
1034                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1035                        exp, exp->exp_client_uuid.uuid);
1036
1037                 /* release nid stat refererence */
1038                 lprocfs_exp_cleanup(exp);
1039
1040                 if (exp == obd->obd_self_export) {
1041                         /* self export should be destroyed without
1042                          * zombie thread as it doesn't hold a
1043                          * reference to obd and doesn't hold any
1044                          * resources */
1045                         class_export_destroy(exp);
1046                         /* self export is destroyed, no class
1047                          * references exist and it is safe to free
1048                          * obd */
1049                         class_free_dev(obd);
1050                 } else {
1051                         LASSERT(!list_empty(&exp->exp_obd_chain));
1052                         obd_zombie_export_add(exp);
1053                 }
1054
1055         }
1056 }
1057 EXPORT_SYMBOL(class_export_put);
1058
1059 static void obd_zombie_exp_cull(struct work_struct *ws)
1060 {
1061         struct obd_export *export;
1062
1063         export = container_of(ws, struct obd_export, exp_zombie_work);
1064         class_export_destroy(export);
1065 }
1066
1067 /* Creates a new export, adds it to the hash table, and returns a
1068  * pointer to it. The refcount is 2: one for the hash reference, and
1069  * one for the pointer returned by this function. */
1070 struct obd_export *__class_new_export(struct obd_device *obd,
1071                                       struct obd_uuid *cluuid, bool is_self)
1072 {
1073         struct obd_export *export;
1074         struct cfs_hash *hash = NULL;
1075         int rc = 0;
1076         ENTRY;
1077
1078         OBD_ALLOC_PTR(export);
1079         if (!export)
1080                 return ERR_PTR(-ENOMEM);
1081
1082         export->exp_conn_cnt = 0;
1083         export->exp_lock_hash = NULL;
1084         export->exp_flock_hash = NULL;
1085         /* 2 = class_handle_hash + last */
1086         atomic_set(&export->exp_refcount, 2);
1087         atomic_set(&export->exp_rpc_count, 0);
1088         atomic_set(&export->exp_cb_count, 0);
1089         atomic_set(&export->exp_locks_count, 0);
1090 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1091         INIT_LIST_HEAD(&export->exp_locks_list);
1092         spin_lock_init(&export->exp_locks_list_guard);
1093 #endif
1094         atomic_set(&export->exp_replay_count, 0);
1095         export->exp_obd = obd;
1096         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1097         spin_lock_init(&export->exp_uncommitted_replies_lock);
1098         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1099         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1100         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1101         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1102         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1103         class_handle_hash(&export->exp_handle, &export_handle_ops);
1104         export->exp_last_request_time = ktime_get_real_seconds();
1105         spin_lock_init(&export->exp_lock);
1106         spin_lock_init(&export->exp_rpc_lock);
1107         INIT_HLIST_NODE(&export->exp_uuid_hash);
1108         INIT_HLIST_NODE(&export->exp_nid_hash);
1109         INIT_HLIST_NODE(&export->exp_gen_hash);
1110         spin_lock_init(&export->exp_bl_list_lock);
1111         INIT_LIST_HEAD(&export->exp_bl_list);
1112         INIT_LIST_HEAD(&export->exp_stale_list);
1113         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1114
1115         export->exp_sp_peer = LUSTRE_SP_ANY;
1116         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1117         export->exp_client_uuid = *cluuid;
1118         obd_init_export(export);
1119
1120         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1121                 spin_lock(&obd->obd_dev_lock);
1122                 /* shouldn't happen, but might race */
1123                 if (obd->obd_stopping)
1124                         GOTO(exit_unlock, rc = -ENODEV);
1125
1126                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1127                 if (hash == NULL)
1128                         GOTO(exit_unlock, rc = -ENODEV);
1129                 spin_unlock(&obd->obd_dev_lock);
1130
1131                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1132                 if (rc != 0) {
1133                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1134                                       obd->obd_name, cluuid->uuid, rc);
1135                         GOTO(exit_err, rc = -EALREADY);
1136                 }
1137         }
1138
1139         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1140         spin_lock(&obd->obd_dev_lock);
1141         if (obd->obd_stopping) {
1142                 if (hash)
1143                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1144                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1145         }
1146
1147         if (!is_self) {
1148                 class_incref(obd, "export", export);
1149                 list_add_tail(&export->exp_obd_chain_timed,
1150                               &obd->obd_exports_timed);
1151                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1152                 obd->obd_num_exports++;
1153         } else {
1154                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1155                 INIT_LIST_HEAD(&export->exp_obd_chain);
1156         }
1157         spin_unlock(&obd->obd_dev_lock);
1158         if (hash)
1159                 cfs_hash_putref(hash);
1160         RETURN(export);
1161
1162 exit_unlock:
1163         spin_unlock(&obd->obd_dev_lock);
1164 exit_err:
1165         if (hash)
1166                 cfs_hash_putref(hash);
1167         class_handle_unhash(&export->exp_handle);
1168         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1169         obd_destroy_export(export);
1170         OBD_FREE_PTR(export);
1171         return ERR_PTR(rc);
1172 }
1173
1174 struct obd_export *class_new_export(struct obd_device *obd,
1175                                     struct obd_uuid *uuid)
1176 {
1177         return __class_new_export(obd, uuid, false);
1178 }
1179 EXPORT_SYMBOL(class_new_export);
1180
1181 struct obd_export *class_new_export_self(struct obd_device *obd,
1182                                          struct obd_uuid *uuid)
1183 {
1184         return __class_new_export(obd, uuid, true);
1185 }
1186
1187 void class_unlink_export(struct obd_export *exp)
1188 {
1189         class_handle_unhash(&exp->exp_handle);
1190
1191         if (exp->exp_obd->obd_self_export == exp) {
1192                 class_export_put(exp);
1193                 return;
1194         }
1195
1196         spin_lock(&exp->exp_obd->obd_dev_lock);
1197         /* delete an uuid-export hashitem from hashtables */
1198         if (!hlist_unhashed(&exp->exp_uuid_hash))
1199                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1200                              &exp->exp_client_uuid,
1201                              &exp->exp_uuid_hash);
1202
1203 #ifdef HAVE_SERVER_SUPPORT
1204         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1205                 struct tg_export_data   *ted = &exp->exp_target_data;
1206                 struct cfs_hash         *hash;
1207
1208                 /* Because obd_gen_hash will not be released until
1209                  * class_cleanup(), so hash should never be NULL here */
1210                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1211                 LASSERT(hash != NULL);
1212                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1213                              &exp->exp_gen_hash);
1214                 cfs_hash_putref(hash);
1215         }
1216 #endif /* HAVE_SERVER_SUPPORT */
1217
1218         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1219         list_del_init(&exp->exp_obd_chain_timed);
1220         exp->exp_obd->obd_num_exports--;
1221         spin_unlock(&exp->exp_obd->obd_dev_lock);
1222         atomic_inc(&obd_stale_export_num);
1223
1224         /* A reference is kept by obd_stale_exports list */
1225         obd_stale_export_put(exp);
1226 }
1227 EXPORT_SYMBOL(class_unlink_export);
1228
1229 /* Import management functions */
1230 static void obd_zombie_import_free(struct obd_import *imp)
1231 {
1232         ENTRY;
1233
1234         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1235                 imp->imp_obd->obd_name);
1236
1237         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1238
1239         ptlrpc_put_connection_superhack(imp->imp_connection);
1240
1241         while (!list_empty(&imp->imp_conn_list)) {
1242                 struct obd_import_conn *imp_conn;
1243
1244                 imp_conn = list_entry(imp->imp_conn_list.next,
1245                                       struct obd_import_conn, oic_item);
1246                 list_del_init(&imp_conn->oic_item);
1247                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1248                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1249         }
1250
1251         LASSERT(imp->imp_sec == NULL);
1252         class_decref(imp->imp_obd, "import", imp);
1253         OBD_FREE_PTR(imp);
1254         EXIT;
1255 }
1256
1257 struct obd_import *class_import_get(struct obd_import *import)
1258 {
1259         atomic_inc(&import->imp_refcount);
1260         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1261                atomic_read(&import->imp_refcount),
1262                import->imp_obd->obd_name);
1263         return import;
1264 }
1265 EXPORT_SYMBOL(class_import_get);
1266
1267 void class_import_put(struct obd_import *imp)
1268 {
1269         ENTRY;
1270
1271         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1272
1273         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1274                atomic_read(&imp->imp_refcount) - 1,
1275                imp->imp_obd->obd_name);
1276
1277         if (atomic_dec_and_test(&imp->imp_refcount)) {
1278                 CDEBUG(D_INFO, "final put import %p\n", imp);
1279                 obd_zombie_import_add(imp);
1280         }
1281
1282         /* catch possible import put race */
1283         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1284         EXIT;
1285 }
1286 EXPORT_SYMBOL(class_import_put);
1287
1288 static void init_imp_at(struct imp_at *at) {
1289         int i;
1290         at_init(&at->iat_net_latency, 0, 0);
1291         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1292                 /* max service estimates are tracked on the server side, so
1293                    don't use the AT history here, just use the last reported
1294                    val. (But keep hist for proc histogram, worst_ever) */
1295                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1296                         AT_FLG_NOHIST);
1297         }
1298 }
1299
1300 static void obd_zombie_imp_cull(struct work_struct *ws)
1301 {
1302         struct obd_import *import;
1303
1304         import = container_of(ws, struct obd_import, imp_zombie_work);
1305         obd_zombie_import_free(import);
1306 }
1307
1308 struct obd_import *class_new_import(struct obd_device *obd)
1309 {
1310         struct obd_import *imp;
1311         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1312
1313         OBD_ALLOC(imp, sizeof(*imp));
1314         if (imp == NULL)
1315                 return NULL;
1316
1317         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1318         INIT_LIST_HEAD(&imp->imp_replay_list);
1319         INIT_LIST_HEAD(&imp->imp_sending_list);
1320         INIT_LIST_HEAD(&imp->imp_delayed_list);
1321         INIT_LIST_HEAD(&imp->imp_committed_list);
1322         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1323         imp->imp_known_replied_xid = 0;
1324         imp->imp_replay_cursor = &imp->imp_committed_list;
1325         spin_lock_init(&imp->imp_lock);
1326         imp->imp_last_success_conn = 0;
1327         imp->imp_state = LUSTRE_IMP_NEW;
1328         imp->imp_obd = class_incref(obd, "import", imp);
1329         mutex_init(&imp->imp_sec_mutex);
1330         init_waitqueue_head(&imp->imp_recovery_waitq);
1331         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1332
1333         if (curr_pid_ns->child_reaper)
1334                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1335         else
1336                 imp->imp_sec_refpid = 1;
1337
1338         atomic_set(&imp->imp_refcount, 2);
1339         atomic_set(&imp->imp_unregistering, 0);
1340         atomic_set(&imp->imp_inflight, 0);
1341         atomic_set(&imp->imp_replay_inflight, 0);
1342         atomic_set(&imp->imp_inval_count, 0);
1343         INIT_LIST_HEAD(&imp->imp_conn_list);
1344         init_imp_at(&imp->imp_at);
1345
1346         /* the default magic is V2, will be used in connect RPC, and
1347          * then adjusted according to the flags in request/reply. */
1348         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1349
1350         return imp;
1351 }
1352 EXPORT_SYMBOL(class_new_import);
1353
1354 void class_destroy_import(struct obd_import *import)
1355 {
1356         LASSERT(import != NULL);
1357         LASSERT(import != LP_POISON);
1358
1359         spin_lock(&import->imp_lock);
1360         import->imp_generation++;
1361         spin_unlock(&import->imp_lock);
1362         class_import_put(import);
1363 }
1364 EXPORT_SYMBOL(class_destroy_import);
1365
1366 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1367
1368 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1369 {
1370         spin_lock(&exp->exp_locks_list_guard);
1371
1372         LASSERT(lock->l_exp_refs_nr >= 0);
1373
1374         if (lock->l_exp_refs_target != NULL &&
1375             lock->l_exp_refs_target != exp) {
1376                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1377                               exp, lock, lock->l_exp_refs_target);
1378         }
1379         if ((lock->l_exp_refs_nr ++) == 0) {
1380                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1381                 lock->l_exp_refs_target = exp;
1382         }
1383         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1384                lock, exp, lock->l_exp_refs_nr);
1385         spin_unlock(&exp->exp_locks_list_guard);
1386 }
1387 EXPORT_SYMBOL(__class_export_add_lock_ref);
1388
1389 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1390 {
1391         spin_lock(&exp->exp_locks_list_guard);
1392         LASSERT(lock->l_exp_refs_nr > 0);
1393         if (lock->l_exp_refs_target != exp) {
1394                 LCONSOLE_WARN("lock %p, "
1395                               "mismatching export pointers: %p, %p\n",
1396                               lock, lock->l_exp_refs_target, exp);
1397         }
1398         if (-- lock->l_exp_refs_nr == 0) {
1399                 list_del_init(&lock->l_exp_refs_link);
1400                 lock->l_exp_refs_target = NULL;
1401         }
1402         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1403                lock, exp, lock->l_exp_refs_nr);
1404         spin_unlock(&exp->exp_locks_list_guard);
1405 }
1406 EXPORT_SYMBOL(__class_export_del_lock_ref);
1407 #endif
1408
1409 /* A connection defines an export context in which preallocation can
1410    be managed. This releases the export pointer reference, and returns
1411    the export handle, so the export refcount is 1 when this function
1412    returns. */
1413 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1414                   struct obd_uuid *cluuid)
1415 {
1416         struct obd_export *export;
1417         LASSERT(conn != NULL);
1418         LASSERT(obd != NULL);
1419         LASSERT(cluuid != NULL);
1420         ENTRY;
1421
1422         export = class_new_export(obd, cluuid);
1423         if (IS_ERR(export))
1424                 RETURN(PTR_ERR(export));
1425
1426         conn->cookie = export->exp_handle.h_cookie;
1427         class_export_put(export);
1428
1429         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1430                cluuid->uuid, conn->cookie);
1431         RETURN(0);
1432 }
1433 EXPORT_SYMBOL(class_connect);
1434
1435 /* if export is involved in recovery then clean up related things */
1436 static void class_export_recovery_cleanup(struct obd_export *exp)
1437 {
1438         struct obd_device *obd = exp->exp_obd;
1439
1440         spin_lock(&obd->obd_recovery_task_lock);
1441         if (obd->obd_recovering) {
1442                 if (exp->exp_in_recovery) {
1443                         spin_lock(&exp->exp_lock);
1444                         exp->exp_in_recovery = 0;
1445                         spin_unlock(&exp->exp_lock);
1446                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1447                         atomic_dec(&obd->obd_connected_clients);
1448                 }
1449
1450                 /* if called during recovery then should update
1451                  * obd_stale_clients counter,
1452                  * lightweight exports are not counted */
1453                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1454                         exp->exp_obd->obd_stale_clients++;
1455         }
1456         spin_unlock(&obd->obd_recovery_task_lock);
1457
1458         spin_lock(&exp->exp_lock);
1459         /** Cleanup req replay fields */
1460         if (exp->exp_req_replay_needed) {
1461                 exp->exp_req_replay_needed = 0;
1462
1463                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1464                 atomic_dec(&obd->obd_req_replay_clients);
1465         }
1466
1467         /** Cleanup lock replay data */
1468         if (exp->exp_lock_replay_needed) {
1469                 exp->exp_lock_replay_needed = 0;
1470
1471                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1472                 atomic_dec(&obd->obd_lock_replay_clients);
1473         }
1474         spin_unlock(&exp->exp_lock);
1475 }
1476
1477 /* This function removes 1-3 references from the export:
1478  * 1 - for export pointer passed
1479  * and if disconnect really need
1480  * 2 - removing from hash
1481  * 3 - in client_unlink_export
1482  * The export pointer passed to this function can destroyed */
1483 int class_disconnect(struct obd_export *export)
1484 {
1485         int already_disconnected;
1486         ENTRY;
1487
1488         if (export == NULL) {
1489                 CWARN("attempting to free NULL export %p\n", export);
1490                 RETURN(-EINVAL);
1491         }
1492
1493         spin_lock(&export->exp_lock);
1494         already_disconnected = export->exp_disconnected;
1495         export->exp_disconnected = 1;
1496         /*  We hold references of export for uuid hash
1497          *  and nid_hash and export link at least. So
1498          *  it is safe to call cfs_hash_del in there.  */
1499         if (!hlist_unhashed(&export->exp_nid_hash))
1500                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1501                              &export->exp_connection->c_peer.nid,
1502                              &export->exp_nid_hash);
1503         spin_unlock(&export->exp_lock);
1504
1505         /* class_cleanup(), abort_recovery(), and class_fail_export()
1506          * all end up in here, and if any of them race we shouldn't
1507          * call extra class_export_puts(). */
1508         if (already_disconnected) {
1509                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1510                 GOTO(no_disconn, already_disconnected);
1511         }
1512
1513         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1514                export->exp_handle.h_cookie);
1515
1516         class_export_recovery_cleanup(export);
1517         class_unlink_export(export);
1518 no_disconn:
1519         class_export_put(export);
1520         RETURN(0);
1521 }
1522 EXPORT_SYMBOL(class_disconnect);
1523
1524 /* Return non-zero for a fully connected export */
1525 int class_connected_export(struct obd_export *exp)
1526 {
1527         int connected = 0;
1528
1529         if (exp) {
1530                 spin_lock(&exp->exp_lock);
1531                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1532                 spin_unlock(&exp->exp_lock);
1533         }
1534         return connected;
1535 }
1536 EXPORT_SYMBOL(class_connected_export);
1537
1538 static void class_disconnect_export_list(struct list_head *list,
1539                                          enum obd_option flags)
1540 {
1541         int rc;
1542         struct obd_export *exp;
1543         ENTRY;
1544
1545         /* It's possible that an export may disconnect itself, but
1546          * nothing else will be added to this list. */
1547         while (!list_empty(list)) {
1548                 exp = list_entry(list->next, struct obd_export,
1549                                  exp_obd_chain);
1550                 /* need for safe call CDEBUG after obd_disconnect */
1551                 class_export_get(exp);
1552
1553                 spin_lock(&exp->exp_lock);
1554                 exp->exp_flags = flags;
1555                 spin_unlock(&exp->exp_lock);
1556
1557                 if (obd_uuid_equals(&exp->exp_client_uuid,
1558                                     &exp->exp_obd->obd_uuid)) {
1559                         CDEBUG(D_HA,
1560                                "exp %p export uuid == obd uuid, don't discon\n",
1561                                exp);
1562                         /* Need to delete this now so we don't end up pointing
1563                          * to work_list later when this export is cleaned up. */
1564                         list_del_init(&exp->exp_obd_chain);
1565                         class_export_put(exp);
1566                         continue;
1567                 }
1568
1569                 class_export_get(exp);
1570                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1571                        "last request at %lld\n",
1572                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1573                        exp, exp->exp_last_request_time);
1574                 /* release one export reference anyway */
1575                 rc = obd_disconnect(exp);
1576
1577                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1578                        obd_export_nid2str(exp), exp, rc);
1579                 class_export_put(exp);
1580         }
1581         EXIT;
1582 }
1583
1584 void class_disconnect_exports(struct obd_device *obd)
1585 {
1586         struct list_head work_list;
1587         ENTRY;
1588
1589         /* Move all of the exports from obd_exports to a work list, en masse. */
1590         INIT_LIST_HEAD(&work_list);
1591         spin_lock(&obd->obd_dev_lock);
1592         list_splice_init(&obd->obd_exports, &work_list);
1593         list_splice_init(&obd->obd_delayed_exports, &work_list);
1594         spin_unlock(&obd->obd_dev_lock);
1595
1596         if (!list_empty(&work_list)) {
1597                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1598                        "disconnecting them\n", obd->obd_minor, obd);
1599                 class_disconnect_export_list(&work_list,
1600                                              exp_flags_from_obd(obd));
1601         } else
1602                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1603                        obd->obd_minor, obd);
1604         EXIT;
1605 }
1606 EXPORT_SYMBOL(class_disconnect_exports);
1607
1608 /* Remove exports that have not completed recovery.
1609  */
1610 void class_disconnect_stale_exports(struct obd_device *obd,
1611                                     int (*test_export)(struct obd_export *))
1612 {
1613         struct list_head work_list;
1614         struct obd_export *exp, *n;
1615         int evicted = 0;
1616         ENTRY;
1617
1618         INIT_LIST_HEAD(&work_list);
1619         spin_lock(&obd->obd_dev_lock);
1620         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1621                                  exp_obd_chain) {
1622                 /* don't count self-export as client */
1623                 if (obd_uuid_equals(&exp->exp_client_uuid,
1624                                     &exp->exp_obd->obd_uuid))
1625                         continue;
1626
1627                 /* don't evict clients which have no slot in last_rcvd
1628                  * (e.g. lightweight connection) */
1629                 if (exp->exp_target_data.ted_lr_idx == -1)
1630                         continue;
1631
1632                 spin_lock(&exp->exp_lock);
1633                 if (exp->exp_failed || test_export(exp)) {
1634                         spin_unlock(&exp->exp_lock);
1635                         continue;
1636                 }
1637                 exp->exp_failed = 1;
1638                 spin_unlock(&exp->exp_lock);
1639
1640                 list_move(&exp->exp_obd_chain, &work_list);
1641                 evicted++;
1642                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1643                        obd->obd_name, exp->exp_client_uuid.uuid,
1644                        obd_export_nid2str(exp));
1645                 print_export_data(exp, "EVICTING", 0, D_HA);
1646         }
1647         spin_unlock(&obd->obd_dev_lock);
1648
1649         if (evicted)
1650                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1651                               obd->obd_name, evicted);
1652
1653         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1654                                                  OBD_OPT_ABORT_RECOV);
1655         EXIT;
1656 }
1657 EXPORT_SYMBOL(class_disconnect_stale_exports);
1658
1659 void class_fail_export(struct obd_export *exp)
1660 {
1661         int rc, already_failed;
1662
1663         spin_lock(&exp->exp_lock);
1664         already_failed = exp->exp_failed;
1665         exp->exp_failed = 1;
1666         spin_unlock(&exp->exp_lock);
1667
1668         if (already_failed) {
1669                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1670                        exp, exp->exp_client_uuid.uuid);
1671                 return;
1672         }
1673
1674         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1675                exp, exp->exp_client_uuid.uuid);
1676
1677         if (obd_dump_on_timeout)
1678                 libcfs_debug_dumplog();
1679
1680         /* need for safe call CDEBUG after obd_disconnect */
1681         class_export_get(exp);
1682
1683         /* Most callers into obd_disconnect are removing their own reference
1684          * (request, for example) in addition to the one from the hash table.
1685          * We don't have such a reference here, so make one. */
1686         class_export_get(exp);
1687         rc = obd_disconnect(exp);
1688         if (rc)
1689                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1690         else
1691                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1692                        exp, exp->exp_client_uuid.uuid);
1693         class_export_put(exp);
1694 }
1695 EXPORT_SYMBOL(class_fail_export);
1696
1697 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1698 {
1699         struct cfs_hash *nid_hash;
1700         struct obd_export *doomed_exp = NULL;
1701         int exports_evicted = 0;
1702
1703         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1704
1705         spin_lock(&obd->obd_dev_lock);
1706         /* umount has run already, so evict thread should leave
1707          * its task to umount thread now */
1708         if (obd->obd_stopping) {
1709                 spin_unlock(&obd->obd_dev_lock);
1710                 return exports_evicted;
1711         }
1712         nid_hash = obd->obd_nid_hash;
1713         cfs_hash_getref(nid_hash);
1714         spin_unlock(&obd->obd_dev_lock);
1715
1716         do {
1717                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1718                 if (doomed_exp == NULL)
1719                         break;
1720
1721                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1722                          "nid %s found, wanted nid %s, requested nid %s\n",
1723                          obd_export_nid2str(doomed_exp),
1724                          libcfs_nid2str(nid_key), nid);
1725                 LASSERTF(doomed_exp != obd->obd_self_export,
1726                          "self-export is hashed by NID?\n");
1727                 exports_evicted++;
1728                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1729                               "request\n", obd->obd_name,
1730                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1731                               obd_export_nid2str(doomed_exp));
1732                 class_fail_export(doomed_exp);
1733                 class_export_put(doomed_exp);
1734         } while (1);
1735
1736         cfs_hash_putref(nid_hash);
1737
1738         if (!exports_evicted)
1739                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1740                        obd->obd_name, nid);
1741         return exports_evicted;
1742 }
1743 EXPORT_SYMBOL(obd_export_evict_by_nid);
1744
1745 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1746 {
1747         struct cfs_hash *uuid_hash;
1748         struct obd_export *doomed_exp = NULL;
1749         struct obd_uuid doomed_uuid;
1750         int exports_evicted = 0;
1751
1752         spin_lock(&obd->obd_dev_lock);
1753         if (obd->obd_stopping) {
1754                 spin_unlock(&obd->obd_dev_lock);
1755                 return exports_evicted;
1756         }
1757         uuid_hash = obd->obd_uuid_hash;
1758         cfs_hash_getref(uuid_hash);
1759         spin_unlock(&obd->obd_dev_lock);
1760
1761         obd_str2uuid(&doomed_uuid, uuid);
1762         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1763                 CERROR("%s: can't evict myself\n", obd->obd_name);
1764                 cfs_hash_putref(uuid_hash);
1765                 return exports_evicted;
1766         }
1767
1768         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1769
1770         if (doomed_exp == NULL) {
1771                 CERROR("%s: can't disconnect %s: no exports found\n",
1772                        obd->obd_name, uuid);
1773         } else {
1774                 CWARN("%s: evicting %s at adminstrative request\n",
1775                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1776                 class_fail_export(doomed_exp);
1777                 class_export_put(doomed_exp);
1778                 exports_evicted++;
1779         }
1780         cfs_hash_putref(uuid_hash);
1781
1782         return exports_evicted;
1783 }
1784
1785 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1786 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1787 EXPORT_SYMBOL(class_export_dump_hook);
1788 #endif
1789
1790 static void print_export_data(struct obd_export *exp, const char *status,
1791                               int locks, int debug_level)
1792 {
1793         struct ptlrpc_reply_state *rs;
1794         struct ptlrpc_reply_state *first_reply = NULL;
1795         int nreplies = 0;
1796
1797         spin_lock(&exp->exp_lock);
1798         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1799                             rs_exp_list) {
1800                 if (nreplies == 0)
1801                         first_reply = rs;
1802                 nreplies++;
1803         }
1804         spin_unlock(&exp->exp_lock);
1805
1806         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1807                "%p %s %llu stale:%d\n",
1808                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1809                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1810                atomic_read(&exp->exp_rpc_count),
1811                atomic_read(&exp->exp_cb_count),
1812                atomic_read(&exp->exp_locks_count),
1813                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1814                nreplies, first_reply, nreplies > 3 ? "..." : "",
1815                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1816 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1817         if (locks && class_export_dump_hook != NULL)
1818                 class_export_dump_hook(exp);
1819 #endif
1820 }
1821
1822 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1823 {
1824         struct obd_export *exp;
1825
1826         spin_lock(&obd->obd_dev_lock);
1827         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1828                 print_export_data(exp, "ACTIVE", locks, debug_level);
1829         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1830                 print_export_data(exp, "UNLINKED", locks, debug_level);
1831         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1832                 print_export_data(exp, "DELAYED", locks, debug_level);
1833         spin_unlock(&obd->obd_dev_lock);
1834 }
1835
1836 void obd_exports_barrier(struct obd_device *obd)
1837 {
1838         int waited = 2;
1839         LASSERT(list_empty(&obd->obd_exports));
1840         spin_lock(&obd->obd_dev_lock);
1841         while (!list_empty(&obd->obd_unlinked_exports)) {
1842                 spin_unlock(&obd->obd_dev_lock);
1843                 set_current_state(TASK_UNINTERRUPTIBLE);
1844                 schedule_timeout(cfs_time_seconds(waited));
1845                 if (waited > 5 && is_power_of_2(waited)) {
1846                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1847                                       "more than %d seconds. "
1848                                       "The obd refcount = %d. Is it stuck?\n",
1849                                       obd->obd_name, waited,
1850                                       atomic_read(&obd->obd_refcount));
1851                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1852                 }
1853                 waited *= 2;
1854                 spin_lock(&obd->obd_dev_lock);
1855         }
1856         spin_unlock(&obd->obd_dev_lock);
1857 }
1858 EXPORT_SYMBOL(obd_exports_barrier);
1859
1860 /**
1861  * Add export to the obd_zombe thread and notify it.
1862  */
1863 static void obd_zombie_export_add(struct obd_export *exp) {
1864         atomic_dec(&obd_stale_export_num);
1865         spin_lock(&exp->exp_obd->obd_dev_lock);
1866         LASSERT(!list_empty(&exp->exp_obd_chain));
1867         list_del_init(&exp->exp_obd_chain);
1868         spin_unlock(&exp->exp_obd->obd_dev_lock);
1869
1870         queue_work(zombie_wq, &exp->exp_zombie_work);
1871 }
1872
1873 /**
1874  * Add import to the obd_zombe thread and notify it.
1875  */
1876 static void obd_zombie_import_add(struct obd_import *imp) {
1877         LASSERT(imp->imp_sec == NULL);
1878
1879         queue_work(zombie_wq, &imp->imp_zombie_work);
1880 }
1881
1882 /**
1883  * wait when obd_zombie import/export queues become empty
1884  */
1885 void obd_zombie_barrier(void)
1886 {
1887         flush_workqueue(zombie_wq);
1888 }
1889 EXPORT_SYMBOL(obd_zombie_barrier);
1890
1891
1892 struct obd_export *obd_stale_export_get(void)
1893 {
1894         struct obd_export *exp = NULL;
1895         ENTRY;
1896
1897         spin_lock(&obd_stale_export_lock);
1898         if (!list_empty(&obd_stale_exports)) {
1899                 exp = list_entry(obd_stale_exports.next,
1900                                  struct obd_export, exp_stale_list);
1901                 list_del_init(&exp->exp_stale_list);
1902         }
1903         spin_unlock(&obd_stale_export_lock);
1904
1905         if (exp) {
1906                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1907                        atomic_read(&obd_stale_export_num));
1908         }
1909         RETURN(exp);
1910 }
1911 EXPORT_SYMBOL(obd_stale_export_get);
1912
1913 void obd_stale_export_put(struct obd_export *exp)
1914 {
1915         ENTRY;
1916
1917         LASSERT(list_empty(&exp->exp_stale_list));
1918         if (exp->exp_lock_hash &&
1919             atomic_read(&exp->exp_lock_hash->hs_count)) {
1920                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1921                        atomic_read(&obd_stale_export_num));
1922
1923                 spin_lock_bh(&exp->exp_bl_list_lock);
1924                 spin_lock(&obd_stale_export_lock);
1925                 /* Add to the tail if there is no blocked locks,
1926                  * to the head otherwise. */
1927                 if (list_empty(&exp->exp_bl_list))
1928                         list_add_tail(&exp->exp_stale_list,
1929                                       &obd_stale_exports);
1930                 else
1931                         list_add(&exp->exp_stale_list,
1932                                  &obd_stale_exports);
1933
1934                 spin_unlock(&obd_stale_export_lock);
1935                 spin_unlock_bh(&exp->exp_bl_list_lock);
1936         } else {
1937                 class_export_put(exp);
1938         }
1939         EXIT;
1940 }
1941 EXPORT_SYMBOL(obd_stale_export_put);
1942
1943 /**
1944  * Adjust the position of the export in the stale list,
1945  * i.e. move to the head of the list if is needed.
1946  **/
1947 void obd_stale_export_adjust(struct obd_export *exp)
1948 {
1949         LASSERT(exp != NULL);
1950         spin_lock_bh(&exp->exp_bl_list_lock);
1951         spin_lock(&obd_stale_export_lock);
1952
1953         if (!list_empty(&exp->exp_stale_list) &&
1954             !list_empty(&exp->exp_bl_list))
1955                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1956
1957         spin_unlock(&obd_stale_export_lock);
1958         spin_unlock_bh(&exp->exp_bl_list_lock);
1959 }
1960 EXPORT_SYMBOL(obd_stale_export_adjust);
1961
1962 /**
1963  * start destroy zombie import/export thread
1964  */
1965 int obd_zombie_impexp_init(void)
1966 {
1967         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1968         if (!zombie_wq)
1969                 return -ENOMEM;
1970
1971         return 0;
1972 }
1973
1974 /**
1975  * stop destroy zombie import/export thread
1976  */
1977 void obd_zombie_impexp_stop(void)
1978 {
1979         destroy_workqueue(zombie_wq);
1980         LASSERT(list_empty(&obd_stale_exports));
1981 }
1982
1983 /***** Kernel-userspace comm helpers *******/
1984
1985 /* Get length of entire message, including header */
1986 int kuc_len(int payload_len)
1987 {
1988         return sizeof(struct kuc_hdr) + payload_len;
1989 }
1990 EXPORT_SYMBOL(kuc_len);
1991
1992 /* Get a pointer to kuc header, given a ptr to the payload
1993  * @param p Pointer to payload area
1994  * @returns Pointer to kuc header
1995  */
1996 struct kuc_hdr * kuc_ptr(void *p)
1997 {
1998         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1999         LASSERT(lh->kuc_magic == KUC_MAGIC);
2000         return lh;
2001 }
2002 EXPORT_SYMBOL(kuc_ptr);
2003
2004 /* Alloc space for a message, and fill in header
2005  * @return Pointer to payload area
2006  */
2007 void *kuc_alloc(int payload_len, int transport, int type)
2008 {
2009         struct kuc_hdr *lh;
2010         int len = kuc_len(payload_len);
2011
2012         OBD_ALLOC(lh, len);
2013         if (lh == NULL)
2014                 return ERR_PTR(-ENOMEM);
2015
2016         lh->kuc_magic = KUC_MAGIC;
2017         lh->kuc_transport = transport;
2018         lh->kuc_msgtype = type;
2019         lh->kuc_msglen = len;
2020
2021         return (void *)(lh + 1);
2022 }
2023 EXPORT_SYMBOL(kuc_alloc);
2024
2025 /* Takes pointer to payload area */
2026 void kuc_free(void *p, int payload_len)
2027 {
2028         struct kuc_hdr *lh = kuc_ptr(p);
2029         OBD_FREE(lh, kuc_len(payload_len));
2030 }
2031 EXPORT_SYMBOL(kuc_free);
2032
2033 struct obd_request_slot_waiter {
2034         struct list_head        orsw_entry;
2035         wait_queue_head_t       orsw_waitq;
2036         bool                    orsw_signaled;
2037 };
2038
2039 static bool obd_request_slot_avail(struct client_obd *cli,
2040                                    struct obd_request_slot_waiter *orsw)
2041 {
2042         bool avail;
2043
2044         spin_lock(&cli->cl_loi_list_lock);
2045         avail = !!list_empty(&orsw->orsw_entry);
2046         spin_unlock(&cli->cl_loi_list_lock);
2047
2048         return avail;
2049 };
2050
2051 /*
2052  * For network flow control, the RPC sponsor needs to acquire a credit
2053  * before sending the RPC. The credits count for a connection is defined
2054  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2055  * the subsequent RPC sponsors need to wait until others released their
2056  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2057  */
2058 int obd_get_request_slot(struct client_obd *cli)
2059 {
2060         struct obd_request_slot_waiter   orsw;
2061         struct l_wait_info               lwi;
2062         int                              rc;
2063
2064         spin_lock(&cli->cl_loi_list_lock);
2065         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2066                 cli->cl_rpcs_in_flight++;
2067                 spin_unlock(&cli->cl_loi_list_lock);
2068                 return 0;
2069         }
2070
2071         init_waitqueue_head(&orsw.orsw_waitq);
2072         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2073         orsw.orsw_signaled = false;
2074         spin_unlock(&cli->cl_loi_list_lock);
2075
2076         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2077         rc = l_wait_event(orsw.orsw_waitq,
2078                           obd_request_slot_avail(cli, &orsw) ||
2079                           orsw.orsw_signaled,
2080                           &lwi);
2081
2082         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2083          * freed but other (such as obd_put_request_slot) is using it. */
2084         spin_lock(&cli->cl_loi_list_lock);
2085         if (rc != 0) {
2086                 if (!orsw.orsw_signaled) {
2087                         if (list_empty(&orsw.orsw_entry))
2088                                 cli->cl_rpcs_in_flight--;
2089                         else
2090                                 list_del(&orsw.orsw_entry);
2091                 }
2092         }
2093
2094         if (orsw.orsw_signaled) {
2095                 LASSERT(list_empty(&orsw.orsw_entry));
2096
2097                 rc = -EINTR;
2098         }
2099         spin_unlock(&cli->cl_loi_list_lock);
2100
2101         return rc;
2102 }
2103 EXPORT_SYMBOL(obd_get_request_slot);
2104
2105 void obd_put_request_slot(struct client_obd *cli)
2106 {
2107         struct obd_request_slot_waiter *orsw;
2108
2109         spin_lock(&cli->cl_loi_list_lock);
2110         cli->cl_rpcs_in_flight--;
2111
2112         /* If there is free slot, wakeup the first waiter. */
2113         if (!list_empty(&cli->cl_flight_waiters) &&
2114             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2115                 orsw = list_entry(cli->cl_flight_waiters.next,
2116                                   struct obd_request_slot_waiter, orsw_entry);
2117                 list_del_init(&orsw->orsw_entry);
2118                 cli->cl_rpcs_in_flight++;
2119                 wake_up(&orsw->orsw_waitq);
2120         }
2121         spin_unlock(&cli->cl_loi_list_lock);
2122 }
2123 EXPORT_SYMBOL(obd_put_request_slot);
2124
2125 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2126 {
2127         return cli->cl_max_rpcs_in_flight;
2128 }
2129 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2130
2131 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2132 {
2133         struct obd_request_slot_waiter *orsw;
2134         __u32                           old;
2135         int                             diff;
2136         int                             i;
2137         char                            *typ_name;
2138         int                             rc;
2139
2140         if (max > OBD_MAX_RIF_MAX || max < 1)
2141                 return -ERANGE;
2142
2143         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2144         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2145                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2146                  * strictly lower that max_rpcs_in_flight */
2147                 if (max < 2) {
2148                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2149                                "because it must be higher than "
2150                                "max_mod_rpcs_in_flight value",
2151                                cli->cl_import->imp_obd->obd_name);
2152                         return -ERANGE;
2153                 }
2154                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2155                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2156                         if (rc != 0)
2157                                 return rc;
2158                 }
2159         }
2160
2161         spin_lock(&cli->cl_loi_list_lock);
2162         old = cli->cl_max_rpcs_in_flight;
2163         cli->cl_max_rpcs_in_flight = max;
2164         client_adjust_max_dirty(cli);
2165
2166         diff = max - old;
2167
2168         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2169         for (i = 0; i < diff; i++) {
2170                 if (list_empty(&cli->cl_flight_waiters))
2171                         break;
2172
2173                 orsw = list_entry(cli->cl_flight_waiters.next,
2174                                   struct obd_request_slot_waiter, orsw_entry);
2175                 list_del_init(&orsw->orsw_entry);
2176                 cli->cl_rpcs_in_flight++;
2177                 wake_up(&orsw->orsw_waitq);
2178         }
2179         spin_unlock(&cli->cl_loi_list_lock);
2180
2181         return 0;
2182 }
2183 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2184
2185 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2186 {
2187         return cli->cl_max_mod_rpcs_in_flight;
2188 }
2189 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2190
2191 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2192 {
2193         struct obd_connect_data *ocd;
2194         __u16 maxmodrpcs;
2195         __u16 prev;
2196
2197         if (max > OBD_MAX_RIF_MAX || max < 1)
2198                 return -ERANGE;
2199
2200         /* cannot exceed or equal max_rpcs_in_flight */
2201         if (max >= cli->cl_max_rpcs_in_flight) {
2202                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2203                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2204                        cli->cl_import->imp_obd->obd_name,
2205                        max, cli->cl_max_rpcs_in_flight);
2206                 return -ERANGE;
2207         }
2208
2209         /* cannot exceed max modify RPCs in flight supported by the server */
2210         ocd = &cli->cl_import->imp_connect_data;
2211         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2212                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2213         else
2214                 maxmodrpcs = 1;
2215         if (max > maxmodrpcs) {
2216                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2217                        "higher than max_mod_rpcs_per_client value (%hu) "
2218                        "returned by the server at connection\n",
2219                        cli->cl_import->imp_obd->obd_name,
2220                        max, maxmodrpcs);
2221                 return -ERANGE;
2222         }
2223
2224         spin_lock(&cli->cl_mod_rpcs_lock);
2225
2226         prev = cli->cl_max_mod_rpcs_in_flight;
2227         cli->cl_max_mod_rpcs_in_flight = max;
2228
2229         /* wakeup waiters if limit has been increased */
2230         if (cli->cl_max_mod_rpcs_in_flight > prev)
2231                 wake_up(&cli->cl_mod_rpcs_waitq);
2232
2233         spin_unlock(&cli->cl_mod_rpcs_lock);
2234
2235         return 0;
2236 }
2237 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2238
2239 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2240                                struct seq_file *seq)
2241 {
2242         unsigned long mod_tot = 0, mod_cum;
2243         struct timespec64 now;
2244         int i;
2245
2246         ktime_get_real_ts64(&now);
2247
2248         spin_lock(&cli->cl_mod_rpcs_lock);
2249
2250         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2251                    (s64)now.tv_sec, now.tv_nsec);
2252         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2253                    cli->cl_mod_rpcs_in_flight);
2254
2255         seq_printf(seq, "\n\t\t\tmodify\n");
2256         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2257
2258         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2259
2260         mod_cum = 0;
2261         for (i = 0; i < OBD_HIST_MAX; i++) {
2262                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2263                 mod_cum += mod;
2264                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2265                            i, mod, pct(mod, mod_tot),
2266                            pct(mod_cum, mod_tot));
2267                 if (mod_cum == mod_tot)
2268                         break;
2269         }
2270
2271         spin_unlock(&cli->cl_mod_rpcs_lock);
2272
2273         return 0;
2274 }
2275 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2276
2277 /* The number of modify RPCs sent in parallel is limited
2278  * because the server has a finite number of slots per client to
2279  * store request result and ensure reply reconstruction when needed.
2280  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2281  * that takes into account server limit and cl_max_rpcs_in_flight
2282  * value.
2283  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2284  * one close request is allowed above the maximum.
2285  */
2286 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2287                                                  bool close_req)
2288 {
2289         bool avail;
2290
2291         /* A slot is available if
2292          * - number of modify RPCs in flight is less than the max
2293          * - it's a close RPC and no other close request is in flight
2294          */
2295         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2296                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2297
2298         return avail;
2299 }
2300
2301 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2302                                          bool close_req)
2303 {
2304         bool avail;
2305
2306         spin_lock(&cli->cl_mod_rpcs_lock);
2307         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2308         spin_unlock(&cli->cl_mod_rpcs_lock);
2309         return avail;
2310 }
2311
2312 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2313 {
2314         if (it != NULL &&
2315             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2316              it->it_op == IT_READDIR ||
2317              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2318                         return true;
2319         return false;
2320 }
2321
2322 /* Get a modify RPC slot from the obd client @cli according
2323  * to the kind of operation @opc that is going to be sent
2324  * and the intent @it of the operation if it applies.
2325  * If the maximum number of modify RPCs in flight is reached
2326  * the thread is put to sleep.
2327  * Returns the tag to be set in the request message. Tag 0
2328  * is reserved for non-modifying requests.
2329  */
2330 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2331                            struct lookup_intent *it)
2332 {
2333         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2334         bool                    close_req = false;
2335         __u16                   i, max;
2336
2337         /* read-only metadata RPCs don't consume a slot on MDT
2338          * for reply reconstruction
2339          */
2340         if (obd_skip_mod_rpc_slot(it))
2341                 return 0;
2342
2343         if (opc == MDS_CLOSE)
2344                 close_req = true;
2345
2346         do {
2347                 spin_lock(&cli->cl_mod_rpcs_lock);
2348                 max = cli->cl_max_mod_rpcs_in_flight;
2349                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2350                         /* there is a slot available */
2351                         cli->cl_mod_rpcs_in_flight++;
2352                         if (close_req)
2353                                 cli->cl_close_rpcs_in_flight++;
2354                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2355                                          cli->cl_mod_rpcs_in_flight);
2356                         /* find a free tag */
2357                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2358                                                 max + 1);
2359                         LASSERT(i < OBD_MAX_RIF_MAX);
2360                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2361                         spin_unlock(&cli->cl_mod_rpcs_lock);
2362                         /* tag 0 is reserved for non-modify RPCs */
2363                         return i + 1;
2364                 }
2365                 spin_unlock(&cli->cl_mod_rpcs_lock);
2366
2367                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2368                        "opc %u, max %hu\n",
2369                        cli->cl_import->imp_obd->obd_name, opc, max);
2370
2371                 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2372                                        obd_mod_rpc_slot_avail(cli, close_req),
2373                                        &lwi);
2374         } while (true);
2375 }
2376 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2377
2378 /* Put a modify RPC slot from the obd client @cli according
2379  * to the kind of operation @opc that has been sent and the
2380  * intent @it of the operation if it applies.
2381  */
2382 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2383                           struct lookup_intent *it, __u16 tag)
2384 {
2385         bool                    close_req = false;
2386
2387         if (obd_skip_mod_rpc_slot(it))
2388                 return;
2389
2390         if (opc == MDS_CLOSE)
2391                 close_req = true;
2392
2393         spin_lock(&cli->cl_mod_rpcs_lock);
2394         cli->cl_mod_rpcs_in_flight--;
2395         if (close_req)
2396                 cli->cl_close_rpcs_in_flight--;
2397         /* release the tag in the bitmap */
2398         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2399         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2400         spin_unlock(&cli->cl_mod_rpcs_lock);
2401         wake_up(&cli->cl_mod_rpcs_waitq);
2402 }
2403 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2404