Whamcloud - gitweb
LU-13070 mdd: try old format for orphan names during recovery
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53
54 static struct kmem_cache *obd_device_cachep;
55
56 static struct workqueue_struct *zombie_wq;
57
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks, int debug_level);
62
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         struct list_head *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         list_for_each(tmp, &obd_types) {
106                 type = list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         OBD_FREE(kobj, sizeof(*kobj));
164 }
165
166 static struct kobj_type class_ktype = {
167         .sysfs_ops      = &lustre_sysfs_ops,
168         .release        = class_sysfs_release,
169 };
170
171 struct kobject *class_setup_tunables(const char *name)
172 {
173         struct kobject *kobj;
174         int rc;
175
176 #ifdef HAVE_SERVER_SUPPORT
177         kobj = kset_find_obj(lustre_kset, name);
178         if (kobj)
179                 return kobj;
180 #endif
181         OBD_ALLOC(kobj, sizeof(*kobj));
182         if (!kobj)
183                 return ERR_PTR(-ENOMEM);
184
185         kobj->kset = lustre_kset;
186         kobject_init(kobj, &class_ktype);
187         rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
188         if (rc) {
189                 kobject_put(kobj);
190                 return ERR_PTR(rc);
191         }
192         return kobj;
193 }
194 EXPORT_SYMBOL(class_setup_tunables);
195
196 #define CLASS_MAX_NAME 1024
197
198 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
199                         bool enable_proc, struct lprocfs_vars *vars,
200                         const char *name, struct lu_device_type *ldt)
201 {
202         struct obd_type *type;
203 #ifdef HAVE_SERVER_SUPPORT
204         struct qstr dname;
205 #endif /* HAVE_SERVER_SUPPORT */
206         int rc = 0;
207
208         ENTRY;
209         /* sanity check */
210         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
211
212         if (class_search_type(name)) {
213                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
214                 RETURN(-EEXIST);
215         }
216
217         rc = -ENOMEM;
218         OBD_ALLOC(type, sizeof(*type));
219         if (type == NULL)
220                 RETURN(rc);
221
222         OBD_ALLOC_PTR(type->typ_dt_ops);
223         OBD_ALLOC_PTR(type->typ_md_ops);
224         OBD_ALLOC(type->typ_name, strlen(name) + 1);
225
226         if (type->typ_dt_ops == NULL ||
227             type->typ_md_ops == NULL ||
228             type->typ_name == NULL)
229                 GOTO (failed, rc);
230
231         *(type->typ_dt_ops) = *dt_ops;
232         /* md_ops is optional */
233         if (md_ops)
234                 *(type->typ_md_ops) = *md_ops;
235         strcpy(type->typ_name, name);
236         spin_lock_init(&type->obd_type_lock);
237
238 #ifdef CONFIG_PROC_FS
239         if (enable_proc) {
240                 type->typ_procroot = lprocfs_register(type->typ_name,
241                                                       proc_lustre_root,
242                                                       vars, type);
243                 if (IS_ERR(type->typ_procroot)) {
244                         rc = PTR_ERR(type->typ_procroot);
245                         type->typ_procroot = NULL;
246                         GOTO(failed, rc);
247                 }
248         }
249 #endif
250 #ifdef HAVE_SERVER_SUPPORT
251         dname.name = name;
252         dname.len = strlen(dname.name);
253         dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
254                                        dname.len);
255         type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
256         if (type->typ_debugfs_entry) {
257                 dput(type->typ_debugfs_entry);
258                 type->typ_sym_filter = true;
259                 goto dir_exist;
260         }
261 #endif /* HAVE_SERVER_SUPPORT */
262
263         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
264                                                     debugfs_lustre_root,
265                                                     NULL, type);
266         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
267                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
268                                              : -ENOMEM;
269                 type->typ_debugfs_entry = NULL;
270                 GOTO(failed, rc);
271         }
272 #ifdef HAVE_SERVER_SUPPORT
273 dir_exist:
274 #endif
275         type->typ_kobj = class_setup_tunables(type->typ_name);
276         if (IS_ERR(type->typ_kobj))
277                 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
278
279         if (ldt) {
280                 type->typ_lu = ldt;
281                 rc = lu_device_type_init(ldt);
282                 if (rc) {
283                         kobject_put(type->typ_kobj);
284                         GOTO(failed, rc);
285                 }
286         }
287
288         spin_lock(&obd_types_lock);
289         list_add(&type->typ_chain, &obd_types);
290         spin_unlock(&obd_types_lock);
291
292         RETURN(0);
293
294 failed:
295 #ifdef HAVE_SERVER_SUPPORT
296         if (type->typ_sym_filter)
297                 type->typ_debugfs_entry = NULL;
298 #endif
299         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
300                 ldebugfs_remove(&type->typ_debugfs_entry);
301         if (type->typ_name != NULL) {
302 #ifdef CONFIG_PROC_FS
303                 if (type->typ_procroot != NULL)
304                         remove_proc_subtree(type->typ_name, proc_lustre_root);
305 #endif
306                 OBD_FREE(type->typ_name, strlen(name) + 1);
307         }
308         if (type->typ_md_ops != NULL)
309                 OBD_FREE_PTR(type->typ_md_ops);
310         if (type->typ_dt_ops != NULL)
311                 OBD_FREE_PTR(type->typ_dt_ops);
312         OBD_FREE(type, sizeof(*type));
313         RETURN(rc);
314 }
315 EXPORT_SYMBOL(class_register_type);
316
317 int class_unregister_type(const char *name)
318 {
319         struct obd_type *type = class_search_type(name);
320         ENTRY;
321
322         if (!type) {
323                 CERROR("unknown obd type\n");
324                 RETURN(-EINVAL);
325         }
326
327         if (type->typ_refcnt) {
328                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
329                 /* This is a bad situation, let's make the best of it */
330                 /* Remove ops, but leave the name for debugging */
331                 OBD_FREE_PTR(type->typ_dt_ops);
332                 OBD_FREE_PTR(type->typ_md_ops);
333                 RETURN(-EBUSY);
334         }
335
336         kobject_put(type->typ_kobj);
337
338         /* we do not use type->typ_procroot as for compatibility purposes
339          * other modules can share names (i.e. lod can use lov entry). so
340          * we can't reference pointer as it can get invalided when another
341          * module removes the entry */
342 #ifdef CONFIG_PROC_FS
343         if (type->typ_procroot != NULL)
344                 remove_proc_subtree(type->typ_name, proc_lustre_root);
345         if (type->typ_procsym != NULL)
346                 lprocfs_remove(&type->typ_procsym);
347 #endif
348 #ifdef HAVE_SERVER_SUPPORT
349         if (type->typ_sym_filter)
350                 type->typ_debugfs_entry = NULL;
351 #endif
352         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
353                 ldebugfs_remove(&type->typ_debugfs_entry);
354
355         if (type->typ_lu)
356                 lu_device_type_fini(type->typ_lu);
357
358         spin_lock(&obd_types_lock);
359         list_del(&type->typ_chain);
360         spin_unlock(&obd_types_lock);
361         OBD_FREE(type->typ_name, strlen(name) + 1);
362         if (type->typ_dt_ops != NULL)
363                 OBD_FREE_PTR(type->typ_dt_ops);
364         if (type->typ_md_ops != NULL)
365                 OBD_FREE_PTR(type->typ_md_ops);
366         OBD_FREE(type, sizeof(*type));
367         RETURN(0);
368 } /* class_unregister_type */
369 EXPORT_SYMBOL(class_unregister_type);
370
371 /**
372  * Create a new obd device.
373  *
374  * Allocate the new obd_device and initialize it.
375  *
376  * \param[in] type_name obd device type string.
377  * \param[in] name      obd device name.
378  * \param[in] uuid      obd device UUID
379  *
380  * \retval newdev         pointer to created obd_device
381  * \retval ERR_PTR(errno) on error
382  */
383 struct obd_device *class_newdev(const char *type_name, const char *name,
384                                 const char *uuid)
385 {
386         struct obd_device *newdev;
387         struct obd_type *type = NULL;
388         ENTRY;
389
390         if (strlen(name) >= MAX_OBD_NAME) {
391                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
392                 RETURN(ERR_PTR(-EINVAL));
393         }
394
395         type = class_get_type(type_name);
396         if (type == NULL){
397                 CERROR("OBD: unknown type: %s\n", type_name);
398                 RETURN(ERR_PTR(-ENODEV));
399         }
400
401         newdev = obd_device_alloc();
402         if (newdev == NULL) {
403                 class_put_type(type);
404                 RETURN(ERR_PTR(-ENOMEM));
405         }
406         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
407         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
408         newdev->obd_type = type;
409         newdev->obd_minor = -1;
410
411         rwlock_init(&newdev->obd_pool_lock);
412         newdev->obd_pool_limit = 0;
413         newdev->obd_pool_slv = 0;
414
415         INIT_LIST_HEAD(&newdev->obd_exports);
416         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
417         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
418         INIT_LIST_HEAD(&newdev->obd_exports_timed);
419         INIT_LIST_HEAD(&newdev->obd_nid_stats);
420         spin_lock_init(&newdev->obd_nid_lock);
421         spin_lock_init(&newdev->obd_dev_lock);
422         mutex_init(&newdev->obd_dev_mutex);
423         spin_lock_init(&newdev->obd_osfs_lock);
424         /* newdev->obd_osfs_age must be set to a value in the distant
425          * past to guarantee a fresh statfs is fetched on mount. */
426         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
427
428         /* XXX belongs in setup not attach  */
429         init_rwsem(&newdev->obd_observer_link_sem);
430         /* recovery data */
431         spin_lock_init(&newdev->obd_recovery_task_lock);
432         init_waitqueue_head(&newdev->obd_next_transno_waitq);
433         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
434         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
435         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
436         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
437         INIT_LIST_HEAD(&newdev->obd_evict_list);
438         INIT_LIST_HEAD(&newdev->obd_lwp_list);
439
440         llog_group_init(&newdev->obd_olg);
441         /* Detach drops this */
442         atomic_set(&newdev->obd_refcount, 1);
443         lu_ref_init(&newdev->obd_reference);
444         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
445
446         newdev->obd_conn_inprogress = 0;
447
448         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
449
450         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
451                newdev->obd_name, newdev);
452
453         return newdev;
454 }
455
456 /**
457  * Free obd device.
458  *
459  * \param[in] obd obd_device to be freed
460  *
461  * \retval none
462  */
463 void class_free_dev(struct obd_device *obd)
464 {
465         struct obd_type *obd_type = obd->obd_type;
466
467         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
468                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
469         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
470                  "obd %p != obd_devs[%d] %p\n",
471                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
472         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
473                  "obd_refcount should be 0, not %d\n",
474                  atomic_read(&obd->obd_refcount));
475         LASSERT(obd_type != NULL);
476
477         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
478                obd->obd_name, obd->obd_type->typ_name);
479
480         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
481                          obd->obd_name, obd->obd_uuid.uuid);
482         if (obd->obd_stopping) {
483                 int err;
484
485                 /* If we're not stopping, we were never set up */
486                 err = obd_cleanup(obd);
487                 if (err)
488                         CERROR("Cleanup %s returned %d\n",
489                                 obd->obd_name, err);
490         }
491
492         obd_device_free(obd);
493
494         class_put_type(obd_type);
495 }
496
497 /**
498  * Unregister obd device.
499  *
500  * Free slot in obd_dev[] used by \a obd.
501  *
502  * \param[in] new_obd obd_device to be unregistered
503  *
504  * \retval none
505  */
506 void class_unregister_device(struct obd_device *obd)
507 {
508         write_lock(&obd_dev_lock);
509         if (obd->obd_minor >= 0) {
510                 LASSERT(obd_devs[obd->obd_minor] == obd);
511                 obd_devs[obd->obd_minor] = NULL;
512                 obd->obd_minor = -1;
513         }
514         write_unlock(&obd_dev_lock);
515 }
516
517 /**
518  * Register obd device.
519  *
520  * Find free slot in obd_devs[], fills it with \a new_obd.
521  *
522  * \param[in] new_obd obd_device to be registered
523  *
524  * \retval 0          success
525  * \retval -EEXIST    device with this name is registered
526  * \retval -EOVERFLOW obd_devs[] is full
527  */
528 int class_register_device(struct obd_device *new_obd)
529 {
530         int ret = 0;
531         int i;
532         int new_obd_minor = 0;
533         bool minor_assign = false;
534         bool retried = false;
535
536 again:
537         write_lock(&obd_dev_lock);
538         for (i = 0; i < class_devno_max(); i++) {
539                 struct obd_device *obd = class_num2obd(i);
540
541                 if (obd != NULL &&
542                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
543
544                         if (!retried) {
545                                 write_unlock(&obd_dev_lock);
546
547                                 /* the obd_device could be waited to be
548                                  * destroyed by the "obd_zombie_impexp_thread".
549                                  */
550                                 obd_zombie_barrier();
551                                 retried = true;
552                                 goto again;
553                         }
554
555                         CERROR("%s: already exists, won't add\n",
556                                obd->obd_name);
557                         /* in case we found a free slot before duplicate */
558                         minor_assign = false;
559                         ret = -EEXIST;
560                         break;
561                 }
562                 if (!minor_assign && obd == NULL) {
563                         new_obd_minor = i;
564                         minor_assign = true;
565                 }
566         }
567
568         if (minor_assign) {
569                 new_obd->obd_minor = new_obd_minor;
570                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
571                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
572                 obd_devs[new_obd_minor] = new_obd;
573         } else {
574                 if (ret == 0) {
575                         ret = -EOVERFLOW;
576                         CERROR("%s: all %u/%u devices used, increase "
577                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
578                                i, class_devno_max(), ret);
579                 }
580         }
581         write_unlock(&obd_dev_lock);
582
583         RETURN(ret);
584 }
585
586 static int class_name2dev_nolock(const char *name)
587 {
588         int i;
589
590         if (!name)
591                 return -1;
592
593         for (i = 0; i < class_devno_max(); i++) {
594                 struct obd_device *obd = class_num2obd(i);
595
596                 if (obd && strcmp(name, obd->obd_name) == 0) {
597                         /* Make sure we finished attaching before we give
598                            out any references */
599                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
600                         if (obd->obd_attached) {
601                                 return i;
602                         }
603                         break;
604                 }
605         }
606
607         return -1;
608 }
609
610 int class_name2dev(const char *name)
611 {
612         int i;
613
614         if (!name)
615                 return -1;
616
617         read_lock(&obd_dev_lock);
618         i = class_name2dev_nolock(name);
619         read_unlock(&obd_dev_lock);
620
621         return i;
622 }
623 EXPORT_SYMBOL(class_name2dev);
624
625 struct obd_device *class_name2obd(const char *name)
626 {
627         int dev = class_name2dev(name);
628
629         if (dev < 0 || dev > class_devno_max())
630                 return NULL;
631         return class_num2obd(dev);
632 }
633 EXPORT_SYMBOL(class_name2obd);
634
635 int class_uuid2dev_nolock(struct obd_uuid *uuid)
636 {
637         int i;
638
639         for (i = 0; i < class_devno_max(); i++) {
640                 struct obd_device *obd = class_num2obd(i);
641
642                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
643                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
644                         return i;
645                 }
646         }
647
648         return -1;
649 }
650
651 int class_uuid2dev(struct obd_uuid *uuid)
652 {
653         int i;
654
655         read_lock(&obd_dev_lock);
656         i = class_uuid2dev_nolock(uuid);
657         read_unlock(&obd_dev_lock);
658
659         return i;
660 }
661 EXPORT_SYMBOL(class_uuid2dev);
662
663 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
664 {
665         int dev = class_uuid2dev(uuid);
666         if (dev < 0)
667                 return NULL;
668         return class_num2obd(dev);
669 }
670 EXPORT_SYMBOL(class_uuid2obd);
671
672 /**
673  * Get obd device from ::obd_devs[]
674  *
675  * \param num [in] array index
676  *
677  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
678  *         otherwise return the obd device there.
679  */
680 struct obd_device *class_num2obd(int num)
681 {
682         struct obd_device *obd = NULL;
683
684         if (num < class_devno_max()) {
685                 obd = obd_devs[num];
686                 if (obd == NULL)
687                         return NULL;
688
689                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
690                          "%p obd_magic %08x != %08x\n",
691                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
692                 LASSERTF(obd->obd_minor == num,
693                          "%p obd_minor %0d != %0d\n",
694                          obd, obd->obd_minor, num);
695         }
696
697         return obd;
698 }
699
700 /**
701  * Find obd in obd_dev[] by name or uuid.
702  *
703  * Increment obd's refcount if found.
704  *
705  * \param[in] str obd name or uuid
706  *
707  * \retval NULL    if not found
708  * \retval target  pointer to found obd_device
709  */
710 struct obd_device *class_dev_by_str(const char *str)
711 {
712         struct obd_device *target = NULL;
713         struct obd_uuid tgtuuid;
714         int rc;
715
716         obd_str2uuid(&tgtuuid, str);
717
718         read_lock(&obd_dev_lock);
719         rc = class_uuid2dev_nolock(&tgtuuid);
720         if (rc < 0)
721                 rc = class_name2dev_nolock(str);
722
723         if (rc >= 0)
724                 target = class_num2obd(rc);
725
726         if (target != NULL)
727                 class_incref(target, "find", current);
728         read_unlock(&obd_dev_lock);
729
730         RETURN(target);
731 }
732 EXPORT_SYMBOL(class_dev_by_str);
733
734 /**
735  * Get obd devices count. Device in any
736  *    state are counted
737  * \retval obd device count
738  */
739 int get_devices_count(void)
740 {
741         int index, max_index = class_devno_max(), dev_count = 0;
742
743         read_lock(&obd_dev_lock);
744         for (index = 0; index <= max_index; index++) {
745                 struct obd_device *obd = class_num2obd(index);
746                 if (obd != NULL)
747                         dev_count++;
748         }
749         read_unlock(&obd_dev_lock);
750
751         return dev_count;
752 }
753 EXPORT_SYMBOL(get_devices_count);
754
755 void class_obd_list(void)
756 {
757         char *status;
758         int i;
759
760         read_lock(&obd_dev_lock);
761         for (i = 0; i < class_devno_max(); i++) {
762                 struct obd_device *obd = class_num2obd(i);
763
764                 if (obd == NULL)
765                         continue;
766                 if (obd->obd_stopping)
767                         status = "ST";
768                 else if (obd->obd_set_up)
769                         status = "UP";
770                 else if (obd->obd_attached)
771                         status = "AT";
772                 else
773                         status = "--";
774                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
775                          i, status, obd->obd_type->typ_name,
776                          obd->obd_name, obd->obd_uuid.uuid,
777                          atomic_read(&obd->obd_refcount));
778         }
779         read_unlock(&obd_dev_lock);
780         return;
781 }
782
783 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
784    specified, then only the client with that uuid is returned,
785    otherwise any client connected to the tgt is returned. */
786 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
787                                           const char * typ_name,
788                                           struct obd_uuid *grp_uuid)
789 {
790         int i;
791
792         read_lock(&obd_dev_lock);
793         for (i = 0; i < class_devno_max(); i++) {
794                 struct obd_device *obd = class_num2obd(i);
795
796                 if (obd == NULL)
797                         continue;
798                 if ((strncmp(obd->obd_type->typ_name, typ_name,
799                              strlen(typ_name)) == 0)) {
800                         if (obd_uuid_equals(tgt_uuid,
801                                             &obd->u.cli.cl_target_uuid) &&
802                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
803                                                          &obd->obd_uuid) : 1)) {
804                                 read_unlock(&obd_dev_lock);
805                                 return obd;
806                         }
807                 }
808         }
809         read_unlock(&obd_dev_lock);
810
811         return NULL;
812 }
813 EXPORT_SYMBOL(class_find_client_obd);
814
815 /* Iterate the obd_device list looking devices have grp_uuid. Start
816    searching at *next, and if a device is found, the next index to look
817    at is saved in *next. If next is NULL, then the first matching device
818    will always be returned. */
819 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
820 {
821         int i;
822
823         if (next == NULL)
824                 i = 0;
825         else if (*next >= 0 && *next < class_devno_max())
826                 i = *next;
827         else
828                 return NULL;
829
830         read_lock(&obd_dev_lock);
831         for (; i < class_devno_max(); i++) {
832                 struct obd_device *obd = class_num2obd(i);
833
834                 if (obd == NULL)
835                         continue;
836                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
837                         if (next != NULL)
838                                 *next = i+1;
839                         read_unlock(&obd_dev_lock);
840                         return obd;
841                 }
842         }
843         read_unlock(&obd_dev_lock);
844
845         return NULL;
846 }
847 EXPORT_SYMBOL(class_devices_in_group);
848
849 /**
850  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
851  * adjust sptlrpc settings accordingly.
852  */
853 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
854 {
855         struct obd_device  *obd;
856         const char         *type;
857         int                 i, rc = 0, rc2;
858
859         LASSERT(namelen > 0);
860
861         read_lock(&obd_dev_lock);
862         for (i = 0; i < class_devno_max(); i++) {
863                 obd = class_num2obd(i);
864
865                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
866                         continue;
867
868                 /* only notify mdc, osc, osp, lwp, mdt, ost
869                  * because only these have a -sptlrpc llog */
870                 type = obd->obd_type->typ_name;
871                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
872                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
873                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
874                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
875                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
876                     strcmp(type, LUSTRE_OST_NAME) != 0)
877                         continue;
878
879                 if (strncmp(obd->obd_name, fsname, namelen))
880                         continue;
881
882                 class_incref(obd, __FUNCTION__, obd);
883                 read_unlock(&obd_dev_lock);
884                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
885                                          sizeof(KEY_SPTLRPC_CONF),
886                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
887                 rc = rc ? rc : rc2;
888                 class_decref(obd, __FUNCTION__, obd);
889                 read_lock(&obd_dev_lock);
890         }
891         read_unlock(&obd_dev_lock);
892         return rc;
893 }
894 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
895
896 void obd_cleanup_caches(void)
897 {
898         ENTRY;
899         if (obd_device_cachep) {
900                 kmem_cache_destroy(obd_device_cachep);
901                 obd_device_cachep = NULL;
902         }
903
904         EXIT;
905 }
906
907 int obd_init_caches(void)
908 {
909         int rc;
910         ENTRY;
911
912         LASSERT(obd_device_cachep == NULL);
913         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
914                                 sizeof(struct obd_device),
915                                 0, 0, 0, sizeof(struct obd_device), NULL);
916         if (!obd_device_cachep)
917                 GOTO(out, rc = -ENOMEM);
918
919         RETURN(0);
920 out:
921         obd_cleanup_caches();
922         RETURN(rc);
923 }
924
925 /* map connection to client */
926 struct obd_export *class_conn2export(struct lustre_handle *conn)
927 {
928         struct obd_export *export;
929         ENTRY;
930
931         if (!conn) {
932                 CDEBUG(D_CACHE, "looking for null handle\n");
933                 RETURN(NULL);
934         }
935
936         if (conn->cookie == -1) {  /* this means assign a new connection */
937                 CDEBUG(D_CACHE, "want a new connection\n");
938                 RETURN(NULL);
939         }
940
941         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
942         export = class_handle2object(conn->cookie, NULL);
943         RETURN(export);
944 }
945 EXPORT_SYMBOL(class_conn2export);
946
947 struct obd_device *class_exp2obd(struct obd_export *exp)
948 {
949         if (exp)
950                 return exp->exp_obd;
951         return NULL;
952 }
953 EXPORT_SYMBOL(class_exp2obd);
954
955 struct obd_import *class_exp2cliimp(struct obd_export *exp)
956 {
957         struct obd_device *obd = exp->exp_obd;
958         if (obd == NULL)
959                 return NULL;
960         return obd->u.cli.cl_import;
961 }
962 EXPORT_SYMBOL(class_exp2cliimp);
963
964 /* Export management functions */
965 static void class_export_destroy(struct obd_export *exp)
966 {
967         struct obd_device *obd = exp->exp_obd;
968         ENTRY;
969
970         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
971         LASSERT(obd != NULL);
972
973         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
974                exp->exp_client_uuid.uuid, obd->obd_name);
975
976         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
977         if (exp->exp_connection)
978                 ptlrpc_put_connection_superhack(exp->exp_connection);
979
980         LASSERT(list_empty(&exp->exp_outstanding_replies));
981         LASSERT(list_empty(&exp->exp_uncommitted_replies));
982         LASSERT(list_empty(&exp->exp_req_replay_queue));
983         LASSERT(list_empty(&exp->exp_hp_rpcs));
984         obd_destroy_export(exp);
985         /* self export doesn't hold a reference to an obd, although it
986          * exists until freeing of the obd */
987         if (exp != obd->obd_self_export)
988                 class_decref(obd, "export", exp);
989
990         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
991         EXIT;
992 }
993
994 static void export_handle_addref(void *export)
995 {
996         class_export_get(export);
997 }
998
999 static struct portals_handle_ops export_handle_ops = {
1000         .hop_addref = export_handle_addref,
1001         .hop_free   = NULL,
1002 };
1003
1004 struct obd_export *class_export_get(struct obd_export *exp)
1005 {
1006         atomic_inc(&exp->exp_refcount);
1007         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1008                atomic_read(&exp->exp_refcount));
1009         return exp;
1010 }
1011 EXPORT_SYMBOL(class_export_get);
1012
1013 void class_export_put(struct obd_export *exp)
1014 {
1015         LASSERT(exp != NULL);
1016         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1017         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1018                atomic_read(&exp->exp_refcount) - 1);
1019
1020         if (atomic_dec_and_test(&exp->exp_refcount)) {
1021                 struct obd_device *obd = exp->exp_obd;
1022
1023                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1024                        exp, exp->exp_client_uuid.uuid);
1025
1026                 /* release nid stat refererence */
1027                 lprocfs_exp_cleanup(exp);
1028
1029                 if (exp == obd->obd_self_export) {
1030                         /* self export should be destroyed without
1031                          * zombie thread as it doesn't hold a
1032                          * reference to obd and doesn't hold any
1033                          * resources */
1034                         class_export_destroy(exp);
1035                         /* self export is destroyed, no class
1036                          * references exist and it is safe to free
1037                          * obd */
1038                         class_free_dev(obd);
1039                 } else {
1040                         LASSERT(!list_empty(&exp->exp_obd_chain));
1041                         obd_zombie_export_add(exp);
1042                 }
1043
1044         }
1045 }
1046 EXPORT_SYMBOL(class_export_put);
1047
1048 static void obd_zombie_exp_cull(struct work_struct *ws)
1049 {
1050         struct obd_export *export;
1051
1052         export = container_of(ws, struct obd_export, exp_zombie_work);
1053         class_export_destroy(export);
1054 }
1055
1056 /* Creates a new export, adds it to the hash table, and returns a
1057  * pointer to it. The refcount is 2: one for the hash reference, and
1058  * one for the pointer returned by this function. */
1059 struct obd_export *__class_new_export(struct obd_device *obd,
1060                                       struct obd_uuid *cluuid, bool is_self)
1061 {
1062         struct obd_export *export;
1063         struct cfs_hash *hash = NULL;
1064         int rc = 0;
1065         ENTRY;
1066
1067         OBD_ALLOC_PTR(export);
1068         if (!export)
1069                 return ERR_PTR(-ENOMEM);
1070
1071         export->exp_conn_cnt = 0;
1072         export->exp_lock_hash = NULL;
1073         export->exp_flock_hash = NULL;
1074         /* 2 = class_handle_hash + last */
1075         atomic_set(&export->exp_refcount, 2);
1076         atomic_set(&export->exp_rpc_count, 0);
1077         atomic_set(&export->exp_cb_count, 0);
1078         atomic_set(&export->exp_locks_count, 0);
1079 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1080         INIT_LIST_HEAD(&export->exp_locks_list);
1081         spin_lock_init(&export->exp_locks_list_guard);
1082 #endif
1083         atomic_set(&export->exp_replay_count, 0);
1084         export->exp_obd = obd;
1085         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1086         spin_lock_init(&export->exp_uncommitted_replies_lock);
1087         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1088         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1089         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1090         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1091         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1092         class_handle_hash(&export->exp_handle, &export_handle_ops);
1093         export->exp_last_request_time = ktime_get_real_seconds();
1094         spin_lock_init(&export->exp_lock);
1095         spin_lock_init(&export->exp_rpc_lock);
1096         INIT_HLIST_NODE(&export->exp_uuid_hash);
1097         INIT_HLIST_NODE(&export->exp_nid_hash);
1098         INIT_HLIST_NODE(&export->exp_gen_hash);
1099         spin_lock_init(&export->exp_bl_list_lock);
1100         INIT_LIST_HEAD(&export->exp_bl_list);
1101         INIT_LIST_HEAD(&export->exp_stale_list);
1102         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1103
1104         export->exp_sp_peer = LUSTRE_SP_ANY;
1105         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1106         export->exp_client_uuid = *cluuid;
1107         obd_init_export(export);
1108
1109         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1110                 spin_lock(&obd->obd_dev_lock);
1111                 /* shouldn't happen, but might race */
1112                 if (obd->obd_stopping)
1113                         GOTO(exit_unlock, rc = -ENODEV);
1114
1115                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1116                 if (hash == NULL)
1117                         GOTO(exit_unlock, rc = -ENODEV);
1118                 spin_unlock(&obd->obd_dev_lock);
1119
1120                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1121                 if (rc != 0) {
1122                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1123                                       obd->obd_name, cluuid->uuid, rc);
1124                         GOTO(exit_err, rc = -EALREADY);
1125                 }
1126         }
1127
1128         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1129         spin_lock(&obd->obd_dev_lock);
1130         if (obd->obd_stopping) {
1131                 if (hash)
1132                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1133                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1134         }
1135
1136         if (!is_self) {
1137                 class_incref(obd, "export", export);
1138                 list_add_tail(&export->exp_obd_chain_timed,
1139                               &obd->obd_exports_timed);
1140                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1141                 obd->obd_num_exports++;
1142         } else {
1143                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1144                 INIT_LIST_HEAD(&export->exp_obd_chain);
1145         }
1146         spin_unlock(&obd->obd_dev_lock);
1147         if (hash)
1148                 cfs_hash_putref(hash);
1149         RETURN(export);
1150
1151 exit_unlock:
1152         spin_unlock(&obd->obd_dev_lock);
1153 exit_err:
1154         if (hash)
1155                 cfs_hash_putref(hash);
1156         class_handle_unhash(&export->exp_handle);
1157         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1158         obd_destroy_export(export);
1159         OBD_FREE_PTR(export);
1160         return ERR_PTR(rc);
1161 }
1162
1163 struct obd_export *class_new_export(struct obd_device *obd,
1164                                     struct obd_uuid *uuid)
1165 {
1166         return __class_new_export(obd, uuid, false);
1167 }
1168 EXPORT_SYMBOL(class_new_export);
1169
1170 struct obd_export *class_new_export_self(struct obd_device *obd,
1171                                          struct obd_uuid *uuid)
1172 {
1173         return __class_new_export(obd, uuid, true);
1174 }
1175
1176 void class_unlink_export(struct obd_export *exp)
1177 {
1178         class_handle_unhash(&exp->exp_handle);
1179
1180         if (exp->exp_obd->obd_self_export == exp) {
1181                 class_export_put(exp);
1182                 return;
1183         }
1184
1185         spin_lock(&exp->exp_obd->obd_dev_lock);
1186         /* delete an uuid-export hashitem from hashtables */
1187         if (!hlist_unhashed(&exp->exp_uuid_hash))
1188                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1189                              &exp->exp_client_uuid,
1190                              &exp->exp_uuid_hash);
1191
1192 #ifdef HAVE_SERVER_SUPPORT
1193         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1194                 struct tg_export_data   *ted = &exp->exp_target_data;
1195                 struct cfs_hash         *hash;
1196
1197                 /* Because obd_gen_hash will not be released until
1198                  * class_cleanup(), so hash should never be NULL here */
1199                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1200                 LASSERT(hash != NULL);
1201                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1202                              &exp->exp_gen_hash);
1203                 cfs_hash_putref(hash);
1204         }
1205 #endif /* HAVE_SERVER_SUPPORT */
1206
1207         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1208         list_del_init(&exp->exp_obd_chain_timed);
1209         exp->exp_obd->obd_num_exports--;
1210         spin_unlock(&exp->exp_obd->obd_dev_lock);
1211         atomic_inc(&obd_stale_export_num);
1212
1213         /* A reference is kept by obd_stale_exports list */
1214         obd_stale_export_put(exp);
1215 }
1216 EXPORT_SYMBOL(class_unlink_export);
1217
1218 /* Import management functions */
1219 static void obd_zombie_import_free(struct obd_import *imp)
1220 {
1221         ENTRY;
1222
1223         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1224                 imp->imp_obd->obd_name);
1225
1226         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1227
1228         ptlrpc_put_connection_superhack(imp->imp_connection);
1229
1230         while (!list_empty(&imp->imp_conn_list)) {
1231                 struct obd_import_conn *imp_conn;
1232
1233                 imp_conn = list_entry(imp->imp_conn_list.next,
1234                                       struct obd_import_conn, oic_item);
1235                 list_del_init(&imp_conn->oic_item);
1236                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1237                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1238         }
1239
1240         LASSERT(imp->imp_sec == NULL);
1241         class_decref(imp->imp_obd, "import", imp);
1242         OBD_FREE_PTR(imp);
1243         EXIT;
1244 }
1245
1246 struct obd_import *class_import_get(struct obd_import *import)
1247 {
1248         atomic_inc(&import->imp_refcount);
1249         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1250                atomic_read(&import->imp_refcount),
1251                import->imp_obd->obd_name);
1252         return import;
1253 }
1254 EXPORT_SYMBOL(class_import_get);
1255
1256 void class_import_put(struct obd_import *imp)
1257 {
1258         ENTRY;
1259
1260         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1261
1262         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1263                atomic_read(&imp->imp_refcount) - 1,
1264                imp->imp_obd->obd_name);
1265
1266         if (atomic_dec_and_test(&imp->imp_refcount)) {
1267                 CDEBUG(D_INFO, "final put import %p\n", imp);
1268                 obd_zombie_import_add(imp);
1269         }
1270
1271         EXIT;
1272 }
1273 EXPORT_SYMBOL(class_import_put);
1274
1275 static void init_imp_at(struct imp_at *at) {
1276         int i;
1277         at_init(&at->iat_net_latency, 0, 0);
1278         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1279                 /* max service estimates are tracked on the server side, so
1280                    don't use the AT history here, just use the last reported
1281                    val. (But keep hist for proc histogram, worst_ever) */
1282                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1283                         AT_FLG_NOHIST);
1284         }
1285 }
1286
1287 static void obd_zombie_imp_cull(struct work_struct *ws)
1288 {
1289         struct obd_import *import;
1290
1291         import = container_of(ws, struct obd_import, imp_zombie_work);
1292         obd_zombie_import_free(import);
1293 }
1294
1295 struct obd_import *class_new_import(struct obd_device *obd)
1296 {
1297         struct obd_import *imp;
1298         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1299
1300         OBD_ALLOC(imp, sizeof(*imp));
1301         if (imp == NULL)
1302                 return NULL;
1303
1304         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1305         INIT_LIST_HEAD(&imp->imp_replay_list);
1306         INIT_LIST_HEAD(&imp->imp_sending_list);
1307         INIT_LIST_HEAD(&imp->imp_delayed_list);
1308         INIT_LIST_HEAD(&imp->imp_committed_list);
1309         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1310         imp->imp_known_replied_xid = 0;
1311         imp->imp_replay_cursor = &imp->imp_committed_list;
1312         spin_lock_init(&imp->imp_lock);
1313         imp->imp_last_success_conn = 0;
1314         imp->imp_state = LUSTRE_IMP_NEW;
1315         imp->imp_obd = class_incref(obd, "import", imp);
1316         mutex_init(&imp->imp_sec_mutex);
1317         init_waitqueue_head(&imp->imp_recovery_waitq);
1318         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1319
1320         if (curr_pid_ns->child_reaper)
1321                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1322         else
1323                 imp->imp_sec_refpid = 1;
1324
1325         atomic_set(&imp->imp_refcount, 2);
1326         atomic_set(&imp->imp_unregistering, 0);
1327         atomic_set(&imp->imp_inflight, 0);
1328         atomic_set(&imp->imp_replay_inflight, 0);
1329         atomic_set(&imp->imp_inval_count, 0);
1330         INIT_LIST_HEAD(&imp->imp_conn_list);
1331         init_imp_at(&imp->imp_at);
1332
1333         /* the default magic is V2, will be used in connect RPC, and
1334          * then adjusted according to the flags in request/reply. */
1335         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1336
1337         return imp;
1338 }
1339 EXPORT_SYMBOL(class_new_import);
1340
1341 void class_destroy_import(struct obd_import *import)
1342 {
1343         LASSERT(import != NULL);
1344         LASSERT(import != LP_POISON);
1345
1346         spin_lock(&import->imp_lock);
1347         import->imp_generation++;
1348         spin_unlock(&import->imp_lock);
1349         class_import_put(import);
1350 }
1351 EXPORT_SYMBOL(class_destroy_import);
1352
1353 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1354
1355 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1356 {
1357         spin_lock(&exp->exp_locks_list_guard);
1358
1359         LASSERT(lock->l_exp_refs_nr >= 0);
1360
1361         if (lock->l_exp_refs_target != NULL &&
1362             lock->l_exp_refs_target != exp) {
1363                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1364                               exp, lock, lock->l_exp_refs_target);
1365         }
1366         if ((lock->l_exp_refs_nr ++) == 0) {
1367                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1368                 lock->l_exp_refs_target = exp;
1369         }
1370         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1371                lock, exp, lock->l_exp_refs_nr);
1372         spin_unlock(&exp->exp_locks_list_guard);
1373 }
1374 EXPORT_SYMBOL(__class_export_add_lock_ref);
1375
1376 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1377 {
1378         spin_lock(&exp->exp_locks_list_guard);
1379         LASSERT(lock->l_exp_refs_nr > 0);
1380         if (lock->l_exp_refs_target != exp) {
1381                 LCONSOLE_WARN("lock %p, "
1382                               "mismatching export pointers: %p, %p\n",
1383                               lock, lock->l_exp_refs_target, exp);
1384         }
1385         if (-- lock->l_exp_refs_nr == 0) {
1386                 list_del_init(&lock->l_exp_refs_link);
1387                 lock->l_exp_refs_target = NULL;
1388         }
1389         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1390                lock, exp, lock->l_exp_refs_nr);
1391         spin_unlock(&exp->exp_locks_list_guard);
1392 }
1393 EXPORT_SYMBOL(__class_export_del_lock_ref);
1394 #endif
1395
1396 /* A connection defines an export context in which preallocation can
1397    be managed. This releases the export pointer reference, and returns
1398    the export handle, so the export refcount is 1 when this function
1399    returns. */
1400 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1401                   struct obd_uuid *cluuid)
1402 {
1403         struct obd_export *export;
1404         LASSERT(conn != NULL);
1405         LASSERT(obd != NULL);
1406         LASSERT(cluuid != NULL);
1407         ENTRY;
1408
1409         export = class_new_export(obd, cluuid);
1410         if (IS_ERR(export))
1411                 RETURN(PTR_ERR(export));
1412
1413         conn->cookie = export->exp_handle.h_cookie;
1414         class_export_put(export);
1415
1416         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1417                cluuid->uuid, conn->cookie);
1418         RETURN(0);
1419 }
1420 EXPORT_SYMBOL(class_connect);
1421
1422 /* if export is involved in recovery then clean up related things */
1423 static void class_export_recovery_cleanup(struct obd_export *exp)
1424 {
1425         struct obd_device *obd = exp->exp_obd;
1426
1427         spin_lock(&obd->obd_recovery_task_lock);
1428         if (obd->obd_recovering) {
1429                 if (exp->exp_in_recovery) {
1430                         spin_lock(&exp->exp_lock);
1431                         exp->exp_in_recovery = 0;
1432                         spin_unlock(&exp->exp_lock);
1433                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1434                         atomic_dec(&obd->obd_connected_clients);
1435                 }
1436
1437                 /* if called during recovery then should update
1438                  * obd_stale_clients counter,
1439                  * lightweight exports are not counted */
1440                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1441                         exp->exp_obd->obd_stale_clients++;
1442         }
1443         spin_unlock(&obd->obd_recovery_task_lock);
1444
1445         spin_lock(&exp->exp_lock);
1446         /** Cleanup req replay fields */
1447         if (exp->exp_req_replay_needed) {
1448                 exp->exp_req_replay_needed = 0;
1449
1450                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1451                 atomic_dec(&obd->obd_req_replay_clients);
1452         }
1453
1454         /** Cleanup lock replay data */
1455         if (exp->exp_lock_replay_needed) {
1456                 exp->exp_lock_replay_needed = 0;
1457
1458                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1459                 atomic_dec(&obd->obd_lock_replay_clients);
1460         }
1461         spin_unlock(&exp->exp_lock);
1462 }
1463
1464 /* This function removes 1-3 references from the export:
1465  * 1 - for export pointer passed
1466  * and if disconnect really need
1467  * 2 - removing from hash
1468  * 3 - in client_unlink_export
1469  * The export pointer passed to this function can destroyed */
1470 int class_disconnect(struct obd_export *export)
1471 {
1472         int already_disconnected;
1473         ENTRY;
1474
1475         if (export == NULL) {
1476                 CWARN("attempting to free NULL export %p\n", export);
1477                 RETURN(-EINVAL);
1478         }
1479
1480         spin_lock(&export->exp_lock);
1481         already_disconnected = export->exp_disconnected;
1482         export->exp_disconnected = 1;
1483         /*  We hold references of export for uuid hash
1484          *  and nid_hash and export link at least. So
1485          *  it is safe to call cfs_hash_del in there.  */
1486         if (!hlist_unhashed(&export->exp_nid_hash))
1487                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1488                              &export->exp_connection->c_peer.nid,
1489                              &export->exp_nid_hash);
1490         spin_unlock(&export->exp_lock);
1491
1492         /* class_cleanup(), abort_recovery(), and class_fail_export()
1493          * all end up in here, and if any of them race we shouldn't
1494          * call extra class_export_puts(). */
1495         if (already_disconnected) {
1496                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1497                 GOTO(no_disconn, already_disconnected);
1498         }
1499
1500         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1501                export->exp_handle.h_cookie);
1502
1503         class_export_recovery_cleanup(export);
1504         class_unlink_export(export);
1505 no_disconn:
1506         class_export_put(export);
1507         RETURN(0);
1508 }
1509 EXPORT_SYMBOL(class_disconnect);
1510
1511 /* Return non-zero for a fully connected export */
1512 int class_connected_export(struct obd_export *exp)
1513 {
1514         int connected = 0;
1515
1516         if (exp) {
1517                 spin_lock(&exp->exp_lock);
1518                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1519                 spin_unlock(&exp->exp_lock);
1520         }
1521         return connected;
1522 }
1523 EXPORT_SYMBOL(class_connected_export);
1524
1525 static void class_disconnect_export_list(struct list_head *list,
1526                                          enum obd_option flags)
1527 {
1528         int rc;
1529         struct obd_export *exp;
1530         ENTRY;
1531
1532         /* It's possible that an export may disconnect itself, but
1533          * nothing else will be added to this list. */
1534         while (!list_empty(list)) {
1535                 exp = list_entry(list->next, struct obd_export,
1536                                  exp_obd_chain);
1537                 /* need for safe call CDEBUG after obd_disconnect */
1538                 class_export_get(exp);
1539
1540                 spin_lock(&exp->exp_lock);
1541                 exp->exp_flags = flags;
1542                 spin_unlock(&exp->exp_lock);
1543
1544                 if (obd_uuid_equals(&exp->exp_client_uuid,
1545                                     &exp->exp_obd->obd_uuid)) {
1546                         CDEBUG(D_HA,
1547                                "exp %p export uuid == obd uuid, don't discon\n",
1548                                exp);
1549                         /* Need to delete this now so we don't end up pointing
1550                          * to work_list later when this export is cleaned up. */
1551                         list_del_init(&exp->exp_obd_chain);
1552                         class_export_put(exp);
1553                         continue;
1554                 }
1555
1556                 class_export_get(exp);
1557                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1558                        "last request at %lld\n",
1559                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1560                        exp, exp->exp_last_request_time);
1561                 /* release one export reference anyway */
1562                 rc = obd_disconnect(exp);
1563
1564                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1565                        obd_export_nid2str(exp), exp, rc);
1566                 class_export_put(exp);
1567         }
1568         EXIT;
1569 }
1570
1571 void class_disconnect_exports(struct obd_device *obd)
1572 {
1573         struct list_head work_list;
1574         ENTRY;
1575
1576         /* Move all of the exports from obd_exports to a work list, en masse. */
1577         INIT_LIST_HEAD(&work_list);
1578         spin_lock(&obd->obd_dev_lock);
1579         list_splice_init(&obd->obd_exports, &work_list);
1580         list_splice_init(&obd->obd_delayed_exports, &work_list);
1581         spin_unlock(&obd->obd_dev_lock);
1582
1583         if (!list_empty(&work_list)) {
1584                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1585                        "disconnecting them\n", obd->obd_minor, obd);
1586                 class_disconnect_export_list(&work_list,
1587                                              exp_flags_from_obd(obd));
1588         } else
1589                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1590                        obd->obd_minor, obd);
1591         EXIT;
1592 }
1593 EXPORT_SYMBOL(class_disconnect_exports);
1594
1595 /* Remove exports that have not completed recovery.
1596  */
1597 void class_disconnect_stale_exports(struct obd_device *obd,
1598                                     int (*test_export)(struct obd_export *))
1599 {
1600         struct list_head work_list;
1601         struct obd_export *exp, *n;
1602         int evicted = 0;
1603         ENTRY;
1604
1605         INIT_LIST_HEAD(&work_list);
1606         spin_lock(&obd->obd_dev_lock);
1607         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1608                                  exp_obd_chain) {
1609                 /* don't count self-export as client */
1610                 if (obd_uuid_equals(&exp->exp_client_uuid,
1611                                     &exp->exp_obd->obd_uuid))
1612                         continue;
1613
1614                 /* don't evict clients which have no slot in last_rcvd
1615                  * (e.g. lightweight connection) */
1616                 if (exp->exp_target_data.ted_lr_idx == -1)
1617                         continue;
1618
1619                 spin_lock(&exp->exp_lock);
1620                 if (exp->exp_failed || test_export(exp)) {
1621                         spin_unlock(&exp->exp_lock);
1622                         continue;
1623                 }
1624                 exp->exp_failed = 1;
1625                 spin_unlock(&exp->exp_lock);
1626
1627                 list_move(&exp->exp_obd_chain, &work_list);
1628                 evicted++;
1629                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1630                        obd->obd_name, exp->exp_client_uuid.uuid,
1631                        obd_export_nid2str(exp));
1632                 print_export_data(exp, "EVICTING", 0, D_HA);
1633         }
1634         spin_unlock(&obd->obd_dev_lock);
1635
1636         if (evicted)
1637                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1638                               obd->obd_name, evicted);
1639
1640         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1641                                                  OBD_OPT_ABORT_RECOV);
1642         EXIT;
1643 }
1644 EXPORT_SYMBOL(class_disconnect_stale_exports);
1645
1646 void class_fail_export(struct obd_export *exp)
1647 {
1648         int rc, already_failed;
1649
1650         spin_lock(&exp->exp_lock);
1651         already_failed = exp->exp_failed;
1652         exp->exp_failed = 1;
1653         spin_unlock(&exp->exp_lock);
1654
1655         if (already_failed) {
1656                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1657                        exp, exp->exp_client_uuid.uuid);
1658                 return;
1659         }
1660
1661         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1662                exp, exp->exp_client_uuid.uuid);
1663
1664         if (obd_dump_on_timeout)
1665                 libcfs_debug_dumplog();
1666
1667         /* need for safe call CDEBUG after obd_disconnect */
1668         class_export_get(exp);
1669
1670         /* Most callers into obd_disconnect are removing their own reference
1671          * (request, for example) in addition to the one from the hash table.
1672          * We don't have such a reference here, so make one. */
1673         class_export_get(exp);
1674         rc = obd_disconnect(exp);
1675         if (rc)
1676                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1677         else
1678                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1679                        exp, exp->exp_client_uuid.uuid);
1680         class_export_put(exp);
1681 }
1682 EXPORT_SYMBOL(class_fail_export);
1683
1684 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1685 {
1686         struct cfs_hash *nid_hash;
1687         struct obd_export *doomed_exp = NULL;
1688         int exports_evicted = 0;
1689
1690         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1691
1692         spin_lock(&obd->obd_dev_lock);
1693         /* umount has run already, so evict thread should leave
1694          * its task to umount thread now */
1695         if (obd->obd_stopping) {
1696                 spin_unlock(&obd->obd_dev_lock);
1697                 return exports_evicted;
1698         }
1699         nid_hash = obd->obd_nid_hash;
1700         cfs_hash_getref(nid_hash);
1701         spin_unlock(&obd->obd_dev_lock);
1702
1703         do {
1704                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1705                 if (doomed_exp == NULL)
1706                         break;
1707
1708                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1709                          "nid %s found, wanted nid %s, requested nid %s\n",
1710                          obd_export_nid2str(doomed_exp),
1711                          libcfs_nid2str(nid_key), nid);
1712                 LASSERTF(doomed_exp != obd->obd_self_export,
1713                          "self-export is hashed by NID?\n");
1714                 exports_evicted++;
1715                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1716                               "request\n", obd->obd_name,
1717                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1718                               obd_export_nid2str(doomed_exp));
1719                 class_fail_export(doomed_exp);
1720                 class_export_put(doomed_exp);
1721         } while (1);
1722
1723         cfs_hash_putref(nid_hash);
1724
1725         if (!exports_evicted)
1726                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1727                        obd->obd_name, nid);
1728         return exports_evicted;
1729 }
1730 EXPORT_SYMBOL(obd_export_evict_by_nid);
1731
1732 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1733 {
1734         struct cfs_hash *uuid_hash;
1735         struct obd_export *doomed_exp = NULL;
1736         struct obd_uuid doomed_uuid;
1737         int exports_evicted = 0;
1738
1739         spin_lock(&obd->obd_dev_lock);
1740         if (obd->obd_stopping) {
1741                 spin_unlock(&obd->obd_dev_lock);
1742                 return exports_evicted;
1743         }
1744         uuid_hash = obd->obd_uuid_hash;
1745         cfs_hash_getref(uuid_hash);
1746         spin_unlock(&obd->obd_dev_lock);
1747
1748         obd_str2uuid(&doomed_uuid, uuid);
1749         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1750                 CERROR("%s: can't evict myself\n", obd->obd_name);
1751                 cfs_hash_putref(uuid_hash);
1752                 return exports_evicted;
1753         }
1754
1755         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1756
1757         if (doomed_exp == NULL) {
1758                 CERROR("%s: can't disconnect %s: no exports found\n",
1759                        obd->obd_name, uuid);
1760         } else {
1761                 CWARN("%s: evicting %s at adminstrative request\n",
1762                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1763                 class_fail_export(doomed_exp);
1764                 class_export_put(doomed_exp);
1765                 exports_evicted++;
1766         }
1767         cfs_hash_putref(uuid_hash);
1768
1769         return exports_evicted;
1770 }
1771
1772 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1773 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1774 EXPORT_SYMBOL(class_export_dump_hook);
1775 #endif
1776
1777 static void print_export_data(struct obd_export *exp, const char *status,
1778                               int locks, int debug_level)
1779 {
1780         struct ptlrpc_reply_state *rs;
1781         struct ptlrpc_reply_state *first_reply = NULL;
1782         int nreplies = 0;
1783
1784         spin_lock(&exp->exp_lock);
1785         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1786                             rs_exp_list) {
1787                 if (nreplies == 0)
1788                         first_reply = rs;
1789                 nreplies++;
1790         }
1791         spin_unlock(&exp->exp_lock);
1792
1793         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1794                "%p %s %llu stale:%d\n",
1795                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1796                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1797                atomic_read(&exp->exp_rpc_count),
1798                atomic_read(&exp->exp_cb_count),
1799                atomic_read(&exp->exp_locks_count),
1800                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1801                nreplies, first_reply, nreplies > 3 ? "..." : "",
1802                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1803 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1804         if (locks && class_export_dump_hook != NULL)
1805                 class_export_dump_hook(exp);
1806 #endif
1807 }
1808
1809 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1810 {
1811         struct obd_export *exp;
1812
1813         spin_lock(&obd->obd_dev_lock);
1814         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1815                 print_export_data(exp, "ACTIVE", locks, debug_level);
1816         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1817                 print_export_data(exp, "UNLINKED", locks, debug_level);
1818         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1819                 print_export_data(exp, "DELAYED", locks, debug_level);
1820         spin_unlock(&obd->obd_dev_lock);
1821 }
1822
1823 void obd_exports_barrier(struct obd_device *obd)
1824 {
1825         int waited = 2;
1826         LASSERT(list_empty(&obd->obd_exports));
1827         spin_lock(&obd->obd_dev_lock);
1828         while (!list_empty(&obd->obd_unlinked_exports)) {
1829                 spin_unlock(&obd->obd_dev_lock);
1830                 set_current_state(TASK_UNINTERRUPTIBLE);
1831                 schedule_timeout(cfs_time_seconds(waited));
1832                 if (waited > 5 && is_power_of_2(waited)) {
1833                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1834                                       "more than %d seconds. "
1835                                       "The obd refcount = %d. Is it stuck?\n",
1836                                       obd->obd_name, waited,
1837                                       atomic_read(&obd->obd_refcount));
1838                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1839                 }
1840                 waited *= 2;
1841                 spin_lock(&obd->obd_dev_lock);
1842         }
1843         spin_unlock(&obd->obd_dev_lock);
1844 }
1845 EXPORT_SYMBOL(obd_exports_barrier);
1846
1847 /**
1848  * Add export to the obd_zombe thread and notify it.
1849  */
1850 static void obd_zombie_export_add(struct obd_export *exp) {
1851         atomic_dec(&obd_stale_export_num);
1852         spin_lock(&exp->exp_obd->obd_dev_lock);
1853         LASSERT(!list_empty(&exp->exp_obd_chain));
1854         list_del_init(&exp->exp_obd_chain);
1855         spin_unlock(&exp->exp_obd->obd_dev_lock);
1856
1857         queue_work(zombie_wq, &exp->exp_zombie_work);
1858 }
1859
1860 /**
1861  * Add import to the obd_zombe thread and notify it.
1862  */
1863 static void obd_zombie_import_add(struct obd_import *imp) {
1864         LASSERT(imp->imp_sec == NULL);
1865
1866         queue_work(zombie_wq, &imp->imp_zombie_work);
1867 }
1868
1869 /**
1870  * wait when obd_zombie import/export queues become empty
1871  */
1872 void obd_zombie_barrier(void)
1873 {
1874         flush_workqueue(zombie_wq);
1875 }
1876 EXPORT_SYMBOL(obd_zombie_barrier);
1877
1878
1879 struct obd_export *obd_stale_export_get(void)
1880 {
1881         struct obd_export *exp = NULL;
1882         ENTRY;
1883
1884         spin_lock(&obd_stale_export_lock);
1885         if (!list_empty(&obd_stale_exports)) {
1886                 exp = list_entry(obd_stale_exports.next,
1887                                  struct obd_export, exp_stale_list);
1888                 list_del_init(&exp->exp_stale_list);
1889         }
1890         spin_unlock(&obd_stale_export_lock);
1891
1892         if (exp) {
1893                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1894                        atomic_read(&obd_stale_export_num));
1895         }
1896         RETURN(exp);
1897 }
1898 EXPORT_SYMBOL(obd_stale_export_get);
1899
1900 void obd_stale_export_put(struct obd_export *exp)
1901 {
1902         ENTRY;
1903
1904         LASSERT(list_empty(&exp->exp_stale_list));
1905         if (exp->exp_lock_hash &&
1906             atomic_read(&exp->exp_lock_hash->hs_count)) {
1907                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1908                        atomic_read(&obd_stale_export_num));
1909
1910                 spin_lock_bh(&exp->exp_bl_list_lock);
1911                 spin_lock(&obd_stale_export_lock);
1912                 /* Add to the tail if there is no blocked locks,
1913                  * to the head otherwise. */
1914                 if (list_empty(&exp->exp_bl_list))
1915                         list_add_tail(&exp->exp_stale_list,
1916                                       &obd_stale_exports);
1917                 else
1918                         list_add(&exp->exp_stale_list,
1919                                  &obd_stale_exports);
1920
1921                 spin_unlock(&obd_stale_export_lock);
1922                 spin_unlock_bh(&exp->exp_bl_list_lock);
1923         } else {
1924                 class_export_put(exp);
1925         }
1926         EXIT;
1927 }
1928 EXPORT_SYMBOL(obd_stale_export_put);
1929
1930 /**
1931  * Adjust the position of the export in the stale list,
1932  * i.e. move to the head of the list if is needed.
1933  **/
1934 void obd_stale_export_adjust(struct obd_export *exp)
1935 {
1936         LASSERT(exp != NULL);
1937         spin_lock_bh(&exp->exp_bl_list_lock);
1938         spin_lock(&obd_stale_export_lock);
1939
1940         if (!list_empty(&exp->exp_stale_list) &&
1941             !list_empty(&exp->exp_bl_list))
1942                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1943
1944         spin_unlock(&obd_stale_export_lock);
1945         spin_unlock_bh(&exp->exp_bl_list_lock);
1946 }
1947 EXPORT_SYMBOL(obd_stale_export_adjust);
1948
1949 /**
1950  * start destroy zombie import/export thread
1951  */
1952 int obd_zombie_impexp_init(void)
1953 {
1954         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1955         if (!zombie_wq)
1956                 return -ENOMEM;
1957
1958         return 0;
1959 }
1960
1961 /**
1962  * stop destroy zombie import/export thread
1963  */
1964 void obd_zombie_impexp_stop(void)
1965 {
1966         destroy_workqueue(zombie_wq);
1967         LASSERT(list_empty(&obd_stale_exports));
1968 }
1969
1970 /***** Kernel-userspace comm helpers *******/
1971
1972 /* Get length of entire message, including header */
1973 int kuc_len(int payload_len)
1974 {
1975         return sizeof(struct kuc_hdr) + payload_len;
1976 }
1977 EXPORT_SYMBOL(kuc_len);
1978
1979 /* Get a pointer to kuc header, given a ptr to the payload
1980  * @param p Pointer to payload area
1981  * @returns Pointer to kuc header
1982  */
1983 struct kuc_hdr * kuc_ptr(void *p)
1984 {
1985         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1986         LASSERT(lh->kuc_magic == KUC_MAGIC);
1987         return lh;
1988 }
1989 EXPORT_SYMBOL(kuc_ptr);
1990
1991 /* Alloc space for a message, and fill in header
1992  * @return Pointer to payload area
1993  */
1994 void *kuc_alloc(int payload_len, int transport, int type)
1995 {
1996         struct kuc_hdr *lh;
1997         int len = kuc_len(payload_len);
1998
1999         OBD_ALLOC(lh, len);
2000         if (lh == NULL)
2001                 return ERR_PTR(-ENOMEM);
2002
2003         lh->kuc_magic = KUC_MAGIC;
2004         lh->kuc_transport = transport;
2005         lh->kuc_msgtype = type;
2006         lh->kuc_msglen = len;
2007
2008         return (void *)(lh + 1);
2009 }
2010 EXPORT_SYMBOL(kuc_alloc);
2011
2012 /* Takes pointer to payload area */
2013 void kuc_free(void *p, int payload_len)
2014 {
2015         struct kuc_hdr *lh = kuc_ptr(p);
2016         OBD_FREE(lh, kuc_len(payload_len));
2017 }
2018 EXPORT_SYMBOL(kuc_free);
2019
2020 struct obd_request_slot_waiter {
2021         struct list_head        orsw_entry;
2022         wait_queue_head_t       orsw_waitq;
2023         bool                    orsw_signaled;
2024 };
2025
2026 static bool obd_request_slot_avail(struct client_obd *cli,
2027                                    struct obd_request_slot_waiter *orsw)
2028 {
2029         bool avail;
2030
2031         spin_lock(&cli->cl_loi_list_lock);
2032         avail = !!list_empty(&orsw->orsw_entry);
2033         spin_unlock(&cli->cl_loi_list_lock);
2034
2035         return avail;
2036 };
2037
2038 /*
2039  * For network flow control, the RPC sponsor needs to acquire a credit
2040  * before sending the RPC. The credits count for a connection is defined
2041  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2042  * the subsequent RPC sponsors need to wait until others released their
2043  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2044  */
2045 int obd_get_request_slot(struct client_obd *cli)
2046 {
2047         struct obd_request_slot_waiter   orsw;
2048         struct l_wait_info               lwi;
2049         int                              rc;
2050
2051         spin_lock(&cli->cl_loi_list_lock);
2052         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2053                 cli->cl_rpcs_in_flight++;
2054                 spin_unlock(&cli->cl_loi_list_lock);
2055                 return 0;
2056         }
2057
2058         init_waitqueue_head(&orsw.orsw_waitq);
2059         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2060         orsw.orsw_signaled = false;
2061         spin_unlock(&cli->cl_loi_list_lock);
2062
2063         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2064         rc = l_wait_event(orsw.orsw_waitq,
2065                           obd_request_slot_avail(cli, &orsw) ||
2066                           orsw.orsw_signaled,
2067                           &lwi);
2068
2069         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2070          * freed but other (such as obd_put_request_slot) is using it. */
2071         spin_lock(&cli->cl_loi_list_lock);
2072         if (rc != 0) {
2073                 if (!orsw.orsw_signaled) {
2074                         if (list_empty(&orsw.orsw_entry))
2075                                 cli->cl_rpcs_in_flight--;
2076                         else
2077                                 list_del(&orsw.orsw_entry);
2078                 }
2079         }
2080
2081         if (orsw.orsw_signaled) {
2082                 LASSERT(list_empty(&orsw.orsw_entry));
2083
2084                 rc = -EINTR;
2085         }
2086         spin_unlock(&cli->cl_loi_list_lock);
2087
2088         return rc;
2089 }
2090 EXPORT_SYMBOL(obd_get_request_slot);
2091
2092 void obd_put_request_slot(struct client_obd *cli)
2093 {
2094         struct obd_request_slot_waiter *orsw;
2095
2096         spin_lock(&cli->cl_loi_list_lock);
2097         cli->cl_rpcs_in_flight--;
2098
2099         /* If there is free slot, wakeup the first waiter. */
2100         if (!list_empty(&cli->cl_flight_waiters) &&
2101             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2102                 orsw = list_entry(cli->cl_flight_waiters.next,
2103                                   struct obd_request_slot_waiter, orsw_entry);
2104                 list_del_init(&orsw->orsw_entry);
2105                 cli->cl_rpcs_in_flight++;
2106                 wake_up(&orsw->orsw_waitq);
2107         }
2108         spin_unlock(&cli->cl_loi_list_lock);
2109 }
2110 EXPORT_SYMBOL(obd_put_request_slot);
2111
2112 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2113 {
2114         return cli->cl_max_rpcs_in_flight;
2115 }
2116 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2117
2118 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2119 {
2120         struct obd_request_slot_waiter *orsw;
2121         __u32                           old;
2122         int                             diff;
2123         int                             i;
2124         char                            *typ_name;
2125         int                             rc;
2126
2127         if (max > OBD_MAX_RIF_MAX || max < 1)
2128                 return -ERANGE;
2129
2130         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2131         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2132                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2133                  * strictly lower that max_rpcs_in_flight */
2134                 if (max < 2) {
2135                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2136                                "because it must be higher than "
2137                                "max_mod_rpcs_in_flight value",
2138                                cli->cl_import->imp_obd->obd_name);
2139                         return -ERANGE;
2140                 }
2141                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2142                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2143                         if (rc != 0)
2144                                 return rc;
2145                 }
2146         }
2147
2148         spin_lock(&cli->cl_loi_list_lock);
2149         old = cli->cl_max_rpcs_in_flight;
2150         cli->cl_max_rpcs_in_flight = max;
2151         client_adjust_max_dirty(cli);
2152
2153         diff = max - old;
2154
2155         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2156         for (i = 0; i < diff; i++) {
2157                 if (list_empty(&cli->cl_flight_waiters))
2158                         break;
2159
2160                 orsw = list_entry(cli->cl_flight_waiters.next,
2161                                   struct obd_request_slot_waiter, orsw_entry);
2162                 list_del_init(&orsw->orsw_entry);
2163                 cli->cl_rpcs_in_flight++;
2164                 wake_up(&orsw->orsw_waitq);
2165         }
2166         spin_unlock(&cli->cl_loi_list_lock);
2167
2168         return 0;
2169 }
2170 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2171
2172 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2173 {
2174         return cli->cl_max_mod_rpcs_in_flight;
2175 }
2176 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2177
2178 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2179 {
2180         struct obd_connect_data *ocd;
2181         __u16 maxmodrpcs;
2182         __u16 prev;
2183
2184         if (max > OBD_MAX_RIF_MAX || max < 1)
2185                 return -ERANGE;
2186
2187         /* cannot exceed or equal max_rpcs_in_flight */
2188         if (max >= cli->cl_max_rpcs_in_flight) {
2189                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2190                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2191                        cli->cl_import->imp_obd->obd_name,
2192                        max, cli->cl_max_rpcs_in_flight);
2193                 return -ERANGE;
2194         }
2195
2196         /* cannot exceed max modify RPCs in flight supported by the server */
2197         ocd = &cli->cl_import->imp_connect_data;
2198         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2199                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2200         else
2201                 maxmodrpcs = 1;
2202         if (max > maxmodrpcs) {
2203                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2204                        "higher than max_mod_rpcs_per_client value (%hu) "
2205                        "returned by the server at connection\n",
2206                        cli->cl_import->imp_obd->obd_name,
2207                        max, maxmodrpcs);
2208                 return -ERANGE;
2209         }
2210
2211         spin_lock(&cli->cl_mod_rpcs_lock);
2212
2213         prev = cli->cl_max_mod_rpcs_in_flight;
2214         cli->cl_max_mod_rpcs_in_flight = max;
2215
2216         /* wakeup waiters if limit has been increased */
2217         if (cli->cl_max_mod_rpcs_in_flight > prev)
2218                 wake_up(&cli->cl_mod_rpcs_waitq);
2219
2220         spin_unlock(&cli->cl_mod_rpcs_lock);
2221
2222         return 0;
2223 }
2224 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2225
2226 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2227                                struct seq_file *seq)
2228 {
2229         unsigned long mod_tot = 0, mod_cum;
2230         struct timespec64 now;
2231         int i;
2232
2233         ktime_get_real_ts64(&now);
2234
2235         spin_lock(&cli->cl_mod_rpcs_lock);
2236
2237         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2238                    (s64)now.tv_sec, now.tv_nsec);
2239         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2240                    cli->cl_mod_rpcs_in_flight);
2241
2242         seq_printf(seq, "\n\t\t\tmodify\n");
2243         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2244
2245         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2246
2247         mod_cum = 0;
2248         for (i = 0; i < OBD_HIST_MAX; i++) {
2249                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2250                 mod_cum += mod;
2251                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2252                            i, mod, pct(mod, mod_tot),
2253                            pct(mod_cum, mod_tot));
2254                 if (mod_cum == mod_tot)
2255                         break;
2256         }
2257
2258         spin_unlock(&cli->cl_mod_rpcs_lock);
2259
2260         return 0;
2261 }
2262 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2263
2264 /* The number of modify RPCs sent in parallel is limited
2265  * because the server has a finite number of slots per client to
2266  * store request result and ensure reply reconstruction when needed.
2267  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2268  * that takes into account server limit and cl_max_rpcs_in_flight
2269  * value.
2270  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2271  * one close request is allowed above the maximum.
2272  */
2273 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2274                                                  bool close_req)
2275 {
2276         bool avail;
2277
2278         /* A slot is available if
2279          * - number of modify RPCs in flight is less than the max
2280          * - it's a close RPC and no other close request is in flight
2281          */
2282         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2283                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2284
2285         return avail;
2286 }
2287
2288 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2289                                          bool close_req)
2290 {
2291         bool avail;
2292
2293         spin_lock(&cli->cl_mod_rpcs_lock);
2294         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2295         spin_unlock(&cli->cl_mod_rpcs_lock);
2296         return avail;
2297 }
2298
2299 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2300 {
2301         if (it != NULL &&
2302             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2303              it->it_op == IT_READDIR ||
2304              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2305                         return true;
2306         return false;
2307 }
2308
2309 /* Get a modify RPC slot from the obd client @cli according
2310  * to the kind of operation @opc that is going to be sent
2311  * and the intent @it of the operation if it applies.
2312  * If the maximum number of modify RPCs in flight is reached
2313  * the thread is put to sleep.
2314  * Returns the tag to be set in the request message. Tag 0
2315  * is reserved for non-modifying requests.
2316  */
2317 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2318                            struct lookup_intent *it)
2319 {
2320         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2321         bool                    close_req = false;
2322         __u16                   i, max;
2323
2324         /* read-only metadata RPCs don't consume a slot on MDT
2325          * for reply reconstruction
2326          */
2327         if (obd_skip_mod_rpc_slot(it))
2328                 return 0;
2329
2330         if (opc == MDS_CLOSE)
2331                 close_req = true;
2332
2333         do {
2334                 spin_lock(&cli->cl_mod_rpcs_lock);
2335                 max = cli->cl_max_mod_rpcs_in_flight;
2336                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2337                         /* there is a slot available */
2338                         cli->cl_mod_rpcs_in_flight++;
2339                         if (close_req)
2340                                 cli->cl_close_rpcs_in_flight++;
2341                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2342                                          cli->cl_mod_rpcs_in_flight);
2343                         /* find a free tag */
2344                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2345                                                 max + 1);
2346                         LASSERT(i < OBD_MAX_RIF_MAX);
2347                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2348                         spin_unlock(&cli->cl_mod_rpcs_lock);
2349                         /* tag 0 is reserved for non-modify RPCs */
2350                         return i + 1;
2351                 }
2352                 spin_unlock(&cli->cl_mod_rpcs_lock);
2353
2354                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2355                        "opc %u, max %hu\n",
2356                        cli->cl_import->imp_obd->obd_name, opc, max);
2357
2358                 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2359                                        obd_mod_rpc_slot_avail(cli, close_req),
2360                                        &lwi);
2361         } while (true);
2362 }
2363 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2364
2365 /* Put a modify RPC slot from the obd client @cli according
2366  * to the kind of operation @opc that has been sent and the
2367  * intent @it of the operation if it applies.
2368  */
2369 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2370                           struct lookup_intent *it, __u16 tag)
2371 {
2372         bool                    close_req = false;
2373
2374         if (obd_skip_mod_rpc_slot(it))
2375                 return;
2376
2377         if (opc == MDS_CLOSE)
2378                 close_req = true;
2379
2380         spin_lock(&cli->cl_mod_rpcs_lock);
2381         cli->cl_mod_rpcs_in_flight--;
2382         if (close_req)
2383                 cli->cl_close_rpcs_in_flight--;
2384         /* release the tag in the bitmap */
2385         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2386         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2387         spin_unlock(&cli->cl_mod_rpcs_lock);
2388         wake_up(&cli->cl_mod_rpcs_waitq);
2389 }
2390 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2391