Whamcloud - gitweb
c057b82e033ed1b39f5b735916bbbdd309ece365
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53
54 static struct kmem_cache *obd_device_cachep;
55
56 static struct workqueue_struct *zombie_wq;
57
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks, int debug_level);
62
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         struct list_head *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         list_for_each(tmp, &obd_types) {
106                 type = list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         OBD_FREE(kobj, sizeof(*kobj));
164 }
165
166 static struct kobj_type class_ktype = {
167         .sysfs_ops      = &lustre_sysfs_ops,
168         .release        = class_sysfs_release,
169 };
170
171 struct kobject *class_setup_tunables(const char *name)
172 {
173         struct kobject *kobj;
174         int rc;
175
176 #ifdef HAVE_SERVER_SUPPORT
177         kobj = kset_find_obj(lustre_kset, name);
178         if (kobj)
179                 return kobj;
180 #endif
181         OBD_ALLOC(kobj, sizeof(*kobj));
182         if (!kobj)
183                 return ERR_PTR(-ENOMEM);
184
185         kobj->kset = lustre_kset;
186         kobject_init(kobj, &class_ktype);
187         rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
188         if (rc) {
189                 kobject_put(kobj);
190                 return ERR_PTR(rc);
191         }
192         return kobj;
193 }
194 EXPORT_SYMBOL(class_setup_tunables);
195
196 #define CLASS_MAX_NAME 1024
197
198 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
199                         bool enable_proc, struct lprocfs_vars *vars,
200                         const char *name, struct lu_device_type *ldt)
201 {
202         struct obd_type *type;
203 #ifdef HAVE_SERVER_SUPPORT
204         struct qstr dname;
205 #endif /* HAVE_SERVER_SUPPORT */
206         int rc = 0;
207
208         ENTRY;
209         /* sanity check */
210         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
211
212         if (class_search_type(name)) {
213                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
214                 RETURN(-EEXIST);
215         }
216
217         rc = -ENOMEM;
218         OBD_ALLOC(type, sizeof(*type));
219         if (type == NULL)
220                 RETURN(rc);
221
222         OBD_ALLOC_PTR(type->typ_dt_ops);
223         OBD_ALLOC_PTR(type->typ_md_ops);
224         OBD_ALLOC(type->typ_name, strlen(name) + 1);
225
226         if (type->typ_dt_ops == NULL ||
227             type->typ_md_ops == NULL ||
228             type->typ_name == NULL)
229                 GOTO (failed, rc);
230
231         *(type->typ_dt_ops) = *dt_ops;
232         /* md_ops is optional */
233         if (md_ops)
234                 *(type->typ_md_ops) = *md_ops;
235         strcpy(type->typ_name, name);
236         spin_lock_init(&type->obd_type_lock);
237
238 #ifdef CONFIG_PROC_FS
239         if (enable_proc) {
240                 type->typ_procroot = lprocfs_register(type->typ_name,
241                                                       proc_lustre_root,
242                                                       vars, type);
243                 if (IS_ERR(type->typ_procroot)) {
244                         rc = PTR_ERR(type->typ_procroot);
245                         type->typ_procroot = NULL;
246                         GOTO(failed, rc);
247                 }
248         }
249 #endif
250 #ifdef HAVE_SERVER_SUPPORT
251         dname.name = name;
252         dname.len = strlen(dname.name);
253         dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
254                                        dname.len);
255         type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
256         if (type->typ_debugfs_entry) {
257                 dput(type->typ_debugfs_entry);
258                 type->typ_sym_filter = true;
259                 goto dir_exist;
260         }
261 #endif /* HAVE_SERVER_SUPPORT */
262
263         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
264                                                     debugfs_lustre_root,
265                                                     NULL, type);
266         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
267                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
268                                              : -ENOMEM;
269                 type->typ_debugfs_entry = NULL;
270                 GOTO(failed, rc);
271         }
272 #ifdef HAVE_SERVER_SUPPORT
273 dir_exist:
274 #endif
275         type->typ_kobj = class_setup_tunables(type->typ_name);
276         if (IS_ERR(type->typ_kobj))
277                 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
278
279         if (ldt) {
280                 type->typ_lu = ldt;
281                 rc = lu_device_type_init(ldt);
282                 if (rc) {
283                         kobject_put(type->typ_kobj);
284                         GOTO(failed, rc);
285                 }
286         }
287
288         spin_lock(&obd_types_lock);
289         list_add(&type->typ_chain, &obd_types);
290         spin_unlock(&obd_types_lock);
291
292         RETURN(0);
293
294 failed:
295 #ifdef HAVE_SERVER_SUPPORT
296         if (type->typ_sym_filter)
297                 type->typ_debugfs_entry = NULL;
298 #endif
299         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
300                 ldebugfs_remove(&type->typ_debugfs_entry);
301         if (type->typ_name != NULL) {
302 #ifdef CONFIG_PROC_FS
303                 if (type->typ_procroot != NULL)
304                         remove_proc_subtree(type->typ_name, proc_lustre_root);
305 #endif
306                 OBD_FREE(type->typ_name, strlen(name) + 1);
307         }
308         if (type->typ_md_ops != NULL)
309                 OBD_FREE_PTR(type->typ_md_ops);
310         if (type->typ_dt_ops != NULL)
311                 OBD_FREE_PTR(type->typ_dt_ops);
312         OBD_FREE(type, sizeof(*type));
313         RETURN(rc);
314 }
315 EXPORT_SYMBOL(class_register_type);
316
317 int class_unregister_type(const char *name)
318 {
319         struct obd_type *type = class_search_type(name);
320         ENTRY;
321
322         if (!type) {
323                 CERROR("unknown obd type\n");
324                 RETURN(-EINVAL);
325         }
326
327         if (type->typ_refcnt) {
328                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
329                 /* This is a bad situation, let's make the best of it */
330                 /* Remove ops, but leave the name for debugging */
331                 OBD_FREE_PTR(type->typ_dt_ops);
332                 OBD_FREE_PTR(type->typ_md_ops);
333                 RETURN(-EBUSY);
334         }
335
336         kobject_put(type->typ_kobj);
337
338         /* we do not use type->typ_procroot as for compatibility purposes
339          * other modules can share names (i.e. lod can use lov entry). so
340          * we can't reference pointer as it can get invalided when another
341          * module removes the entry */
342 #ifdef CONFIG_PROC_FS
343         if (type->typ_procroot != NULL)
344                 remove_proc_subtree(type->typ_name, proc_lustre_root);
345         if (type->typ_procsym != NULL)
346                 lprocfs_remove(&type->typ_procsym);
347 #endif
348 #ifdef HAVE_SERVER_SUPPORT
349         if (type->typ_sym_filter)
350                 type->typ_debugfs_entry = NULL;
351 #endif
352         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
353                 ldebugfs_remove(&type->typ_debugfs_entry);
354
355         if (type->typ_lu)
356                 lu_device_type_fini(type->typ_lu);
357
358         spin_lock(&obd_types_lock);
359         list_del(&type->typ_chain);
360         spin_unlock(&obd_types_lock);
361         OBD_FREE(type->typ_name, strlen(name) + 1);
362         if (type->typ_dt_ops != NULL)
363                 OBD_FREE_PTR(type->typ_dt_ops);
364         if (type->typ_md_ops != NULL)
365                 OBD_FREE_PTR(type->typ_md_ops);
366         OBD_FREE(type, sizeof(*type));
367         RETURN(0);
368 } /* class_unregister_type */
369 EXPORT_SYMBOL(class_unregister_type);
370
371 /**
372  * Create a new obd device.
373  *
374  * Allocate the new obd_device and initialize it.
375  *
376  * \param[in] type_name obd device type string.
377  * \param[in] name      obd device name.
378  * \param[in] uuid      obd device UUID
379  *
380  * \retval newdev         pointer to created obd_device
381  * \retval ERR_PTR(errno) on error
382  */
383 struct obd_device *class_newdev(const char *type_name, const char *name,
384                                 const char *uuid)
385 {
386         struct obd_device *newdev;
387         struct obd_type *type = NULL;
388         ENTRY;
389
390         if (strlen(name) >= MAX_OBD_NAME) {
391                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
392                 RETURN(ERR_PTR(-EINVAL));
393         }
394
395         type = class_get_type(type_name);
396         if (type == NULL){
397                 CERROR("OBD: unknown type: %s\n", type_name);
398                 RETURN(ERR_PTR(-ENODEV));
399         }
400
401         newdev = obd_device_alloc();
402         if (newdev == NULL) {
403                 class_put_type(type);
404                 RETURN(ERR_PTR(-ENOMEM));
405         }
406         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
407         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
408         newdev->obd_type = type;
409         newdev->obd_minor = -1;
410
411         rwlock_init(&newdev->obd_pool_lock);
412         newdev->obd_pool_limit = 0;
413         newdev->obd_pool_slv = 0;
414
415         INIT_LIST_HEAD(&newdev->obd_exports);
416         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
417         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
418         INIT_LIST_HEAD(&newdev->obd_exports_timed);
419         INIT_LIST_HEAD(&newdev->obd_nid_stats);
420         spin_lock_init(&newdev->obd_nid_lock);
421         spin_lock_init(&newdev->obd_dev_lock);
422         mutex_init(&newdev->obd_dev_mutex);
423         spin_lock_init(&newdev->obd_osfs_lock);
424         /* newdev->obd_osfs_age must be set to a value in the distant
425          * past to guarantee a fresh statfs is fetched on mount. */
426         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
427
428         /* XXX belongs in setup not attach  */
429         init_rwsem(&newdev->obd_observer_link_sem);
430         /* recovery data */
431         spin_lock_init(&newdev->obd_recovery_task_lock);
432         init_waitqueue_head(&newdev->obd_next_transno_waitq);
433         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
434         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
435         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
436         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
437         INIT_LIST_HEAD(&newdev->obd_evict_list);
438         INIT_LIST_HEAD(&newdev->obd_lwp_list);
439
440         llog_group_init(&newdev->obd_olg);
441         /* Detach drops this */
442         atomic_set(&newdev->obd_refcount, 1);
443         lu_ref_init(&newdev->obd_reference);
444         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
445
446         newdev->obd_conn_inprogress = 0;
447
448         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
449
450         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
451                newdev->obd_name, newdev);
452
453         return newdev;
454 }
455
456 /**
457  * Free obd device.
458  *
459  * \param[in] obd obd_device to be freed
460  *
461  * \retval none
462  */
463 void class_free_dev(struct obd_device *obd)
464 {
465         struct obd_type *obd_type = obd->obd_type;
466
467         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
468                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
469         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
470                  "obd %p != obd_devs[%d] %p\n",
471                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
472         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
473                  "obd_refcount should be 0, not %d\n",
474                  atomic_read(&obd->obd_refcount));
475         LASSERT(obd_type != NULL);
476
477         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
478                obd->obd_name, obd->obd_type->typ_name);
479
480         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
481                          obd->obd_name, obd->obd_uuid.uuid);
482         if (obd->obd_stopping) {
483                 int err;
484
485                 /* If we're not stopping, we were never set up */
486                 err = obd_cleanup(obd);
487                 if (err)
488                         CERROR("Cleanup %s returned %d\n",
489                                 obd->obd_name, err);
490         }
491
492         obd_device_free(obd);
493
494         class_put_type(obd_type);
495 }
496
497 /**
498  * Unregister obd device.
499  *
500  * Free slot in obd_dev[] used by \a obd.
501  *
502  * \param[in] new_obd obd_device to be unregistered
503  *
504  * \retval none
505  */
506 void class_unregister_device(struct obd_device *obd)
507 {
508         write_lock(&obd_dev_lock);
509         if (obd->obd_minor >= 0) {
510                 LASSERT(obd_devs[obd->obd_minor] == obd);
511                 obd_devs[obd->obd_minor] = NULL;
512                 obd->obd_minor = -1;
513         }
514         write_unlock(&obd_dev_lock);
515 }
516
517 /**
518  * Register obd device.
519  *
520  * Find free slot in obd_devs[], fills it with \a new_obd.
521  *
522  * \param[in] new_obd obd_device to be registered
523  *
524  * \retval 0          success
525  * \retval -EEXIST    device with this name is registered
526  * \retval -EOVERFLOW obd_devs[] is full
527  */
528 int class_register_device(struct obd_device *new_obd)
529 {
530         int ret = 0;
531         int i;
532         int new_obd_minor = 0;
533         bool minor_assign = false;
534         bool retried = false;
535
536 again:
537         write_lock(&obd_dev_lock);
538         for (i = 0; i < class_devno_max(); i++) {
539                 struct obd_device *obd = class_num2obd(i);
540
541                 if (obd != NULL &&
542                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
543
544                         if (!retried) {
545                                 write_unlock(&obd_dev_lock);
546
547                                 /* the obd_device could be waited to be
548                                  * destroyed by the "obd_zombie_impexp_thread".
549                                  */
550                                 obd_zombie_barrier();
551                                 retried = true;
552                                 goto again;
553                         }
554
555                         CERROR("%s: already exists, won't add\n",
556                                obd->obd_name);
557                         /* in case we found a free slot before duplicate */
558                         minor_assign = false;
559                         ret = -EEXIST;
560                         break;
561                 }
562                 if (!minor_assign && obd == NULL) {
563                         new_obd_minor = i;
564                         minor_assign = true;
565                 }
566         }
567
568         if (minor_assign) {
569                 new_obd->obd_minor = new_obd_minor;
570                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
571                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
572                 obd_devs[new_obd_minor] = new_obd;
573         } else {
574                 if (ret == 0) {
575                         ret = -EOVERFLOW;
576                         CERROR("%s: all %u/%u devices used, increase "
577                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
578                                i, class_devno_max(), ret);
579                 }
580         }
581         write_unlock(&obd_dev_lock);
582
583         RETURN(ret);
584 }
585
586 static int class_name2dev_nolock(const char *name)
587 {
588         int i;
589
590         if (!name)
591                 return -1;
592
593         for (i = 0; i < class_devno_max(); i++) {
594                 struct obd_device *obd = class_num2obd(i);
595
596                 if (obd && strcmp(name, obd->obd_name) == 0) {
597                         /* Make sure we finished attaching before we give
598                            out any references */
599                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
600                         if (obd->obd_attached) {
601                                 return i;
602                         }
603                         break;
604                 }
605         }
606
607         return -1;
608 }
609
610 int class_name2dev(const char *name)
611 {
612         int i;
613
614         if (!name)
615                 return -1;
616
617         read_lock(&obd_dev_lock);
618         i = class_name2dev_nolock(name);
619         read_unlock(&obd_dev_lock);
620
621         return i;
622 }
623 EXPORT_SYMBOL(class_name2dev);
624
625 struct obd_device *class_name2obd(const char *name)
626 {
627         int dev = class_name2dev(name);
628
629         if (dev < 0 || dev > class_devno_max())
630                 return NULL;
631         return class_num2obd(dev);
632 }
633 EXPORT_SYMBOL(class_name2obd);
634
635 int class_uuid2dev_nolock(struct obd_uuid *uuid)
636 {
637         int i;
638
639         for (i = 0; i < class_devno_max(); i++) {
640                 struct obd_device *obd = class_num2obd(i);
641
642                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
643                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
644                         return i;
645                 }
646         }
647
648         return -1;
649 }
650
651 int class_uuid2dev(struct obd_uuid *uuid)
652 {
653         int i;
654
655         read_lock(&obd_dev_lock);
656         i = class_uuid2dev_nolock(uuid);
657         read_unlock(&obd_dev_lock);
658
659         return i;
660 }
661 EXPORT_SYMBOL(class_uuid2dev);
662
663 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
664 {
665         int dev = class_uuid2dev(uuid);
666         if (dev < 0)
667                 return NULL;
668         return class_num2obd(dev);
669 }
670 EXPORT_SYMBOL(class_uuid2obd);
671
672 /**
673  * Get obd device from ::obd_devs[]
674  *
675  * \param num [in] array index
676  *
677  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
678  *         otherwise return the obd device there.
679  */
680 struct obd_device *class_num2obd(int num)
681 {
682         struct obd_device *obd = NULL;
683
684         if (num < class_devno_max()) {
685                 obd = obd_devs[num];
686                 if (obd == NULL)
687                         return NULL;
688
689                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
690                          "%p obd_magic %08x != %08x\n",
691                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
692                 LASSERTF(obd->obd_minor == num,
693                          "%p obd_minor %0d != %0d\n",
694                          obd, obd->obd_minor, num);
695         }
696
697         return obd;
698 }
699
700 /**
701  * Find obd in obd_dev[] by name or uuid.
702  *
703  * Increment obd's refcount if found.
704  *
705  * \param[in] str obd name or uuid
706  *
707  * \retval NULL    if not found
708  * \retval target  pointer to found obd_device
709  */
710 struct obd_device *class_dev_by_str(const char *str)
711 {
712         struct obd_device *target = NULL;
713         struct obd_uuid tgtuuid;
714         int rc;
715
716         obd_str2uuid(&tgtuuid, str);
717
718         read_lock(&obd_dev_lock);
719         rc = class_uuid2dev_nolock(&tgtuuid);
720         if (rc < 0)
721                 rc = class_name2dev_nolock(str);
722
723         if (rc >= 0)
724                 target = class_num2obd(rc);
725
726         if (target != NULL)
727                 class_incref(target, "find", current);
728         read_unlock(&obd_dev_lock);
729
730         RETURN(target);
731 }
732 EXPORT_SYMBOL(class_dev_by_str);
733
734 /**
735  * Get obd devices count. Device in any
736  *    state are counted
737  * \retval obd device count
738  */
739 int get_devices_count(void)
740 {
741         int index, max_index = class_devno_max(), dev_count = 0;
742
743         read_lock(&obd_dev_lock);
744         for (index = 0; index <= max_index; index++) {
745                 struct obd_device *obd = class_num2obd(index);
746                 if (obd != NULL)
747                         dev_count++;
748         }
749         read_unlock(&obd_dev_lock);
750
751         return dev_count;
752 }
753 EXPORT_SYMBOL(get_devices_count);
754
755 void class_obd_list(void)
756 {
757         char *status;
758         int i;
759
760         read_lock(&obd_dev_lock);
761         for (i = 0; i < class_devno_max(); i++) {
762                 struct obd_device *obd = class_num2obd(i);
763
764                 if (obd == NULL)
765                         continue;
766                 if (obd->obd_stopping)
767                         status = "ST";
768                 else if (obd->obd_set_up)
769                         status = "UP";
770                 else if (obd->obd_attached)
771                         status = "AT";
772                 else
773                         status = "--";
774                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
775                          i, status, obd->obd_type->typ_name,
776                          obd->obd_name, obd->obd_uuid.uuid,
777                          atomic_read(&obd->obd_refcount));
778         }
779         read_unlock(&obd_dev_lock);
780         return;
781 }
782
783 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
784    specified, then only the client with that uuid is returned,
785    otherwise any client connected to the tgt is returned. */
786 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
787                                           const char * typ_name,
788                                           struct obd_uuid *grp_uuid)
789 {
790         int i;
791
792         read_lock(&obd_dev_lock);
793         for (i = 0; i < class_devno_max(); i++) {
794                 struct obd_device *obd = class_num2obd(i);
795
796                 if (obd == NULL)
797                         continue;
798                 if ((strncmp(obd->obd_type->typ_name, typ_name,
799                              strlen(typ_name)) == 0)) {
800                         if (obd_uuid_equals(tgt_uuid,
801                                             &obd->u.cli.cl_target_uuid) &&
802                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
803                                                          &obd->obd_uuid) : 1)) {
804                                 read_unlock(&obd_dev_lock);
805                                 return obd;
806                         }
807                 }
808         }
809         read_unlock(&obd_dev_lock);
810
811         return NULL;
812 }
813 EXPORT_SYMBOL(class_find_client_obd);
814
815 /* Iterate the obd_device list looking devices have grp_uuid. Start
816    searching at *next, and if a device is found, the next index to look
817    at is saved in *next. If next is NULL, then the first matching device
818    will always be returned. */
819 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
820 {
821         int i;
822
823         if (next == NULL)
824                 i = 0;
825         else if (*next >= 0 && *next < class_devno_max())
826                 i = *next;
827         else
828                 return NULL;
829
830         read_lock(&obd_dev_lock);
831         for (; i < class_devno_max(); i++) {
832                 struct obd_device *obd = class_num2obd(i);
833
834                 if (obd == NULL)
835                         continue;
836                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
837                         if (next != NULL)
838                                 *next = i+1;
839                         read_unlock(&obd_dev_lock);
840                         return obd;
841                 }
842         }
843         read_unlock(&obd_dev_lock);
844
845         return NULL;
846 }
847 EXPORT_SYMBOL(class_devices_in_group);
848
849 /**
850  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
851  * adjust sptlrpc settings accordingly.
852  */
853 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
854 {
855         struct obd_device  *obd;
856         const char         *type;
857         int                 i, rc = 0, rc2;
858
859         LASSERT(namelen > 0);
860
861         read_lock(&obd_dev_lock);
862         for (i = 0; i < class_devno_max(); i++) {
863                 obd = class_num2obd(i);
864
865                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
866                         continue;
867
868                 /* only notify mdc, osc, osp, lwp, mdt, ost
869                  * because only these have a -sptlrpc llog */
870                 type = obd->obd_type->typ_name;
871                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
872                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
873                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
874                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
875                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
876                     strcmp(type, LUSTRE_OST_NAME) != 0)
877                         continue;
878
879                 if (strncmp(obd->obd_name, fsname, namelen))
880                         continue;
881
882                 class_incref(obd, __FUNCTION__, obd);
883                 read_unlock(&obd_dev_lock);
884                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
885                                          sizeof(KEY_SPTLRPC_CONF),
886                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
887                 rc = rc ? rc : rc2;
888                 class_decref(obd, __FUNCTION__, obd);
889                 read_lock(&obd_dev_lock);
890         }
891         read_unlock(&obd_dev_lock);
892         return rc;
893 }
894 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
895
896 void obd_cleanup_caches(void)
897 {
898         ENTRY;
899         if (obd_device_cachep) {
900                 kmem_cache_destroy(obd_device_cachep);
901                 obd_device_cachep = NULL;
902         }
903
904         EXIT;
905 }
906
907 int obd_init_caches(void)
908 {
909         int rc;
910         ENTRY;
911
912         LASSERT(obd_device_cachep == NULL);
913         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
914                                 sizeof(struct obd_device),
915                                 0, 0, 0, sizeof(struct obd_device), NULL);
916         if (!obd_device_cachep)
917                 GOTO(out, rc = -ENOMEM);
918
919         RETURN(0);
920 out:
921         obd_cleanup_caches();
922         RETURN(rc);
923 }
924
925 /* map connection to client */
926 struct obd_export *class_conn2export(struct lustre_handle *conn)
927 {
928         struct obd_export *export;
929         ENTRY;
930
931         if (!conn) {
932                 CDEBUG(D_CACHE, "looking for null handle\n");
933                 RETURN(NULL);
934         }
935
936         if (conn->cookie == -1) {  /* this means assign a new connection */
937                 CDEBUG(D_CACHE, "want a new connection\n");
938                 RETURN(NULL);
939         }
940
941         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
942         export = class_handle2object(conn->cookie, NULL);
943         RETURN(export);
944 }
945 EXPORT_SYMBOL(class_conn2export);
946
947 struct obd_device *class_exp2obd(struct obd_export *exp)
948 {
949         if (exp)
950                 return exp->exp_obd;
951         return NULL;
952 }
953 EXPORT_SYMBOL(class_exp2obd);
954
955 struct obd_import *class_exp2cliimp(struct obd_export *exp)
956 {
957         struct obd_device *obd = exp->exp_obd;
958         if (obd == NULL)
959                 return NULL;
960         return obd->u.cli.cl_import;
961 }
962 EXPORT_SYMBOL(class_exp2cliimp);
963
964 /* Export management functions */
965 static void class_export_destroy(struct obd_export *exp)
966 {
967         struct obd_device *obd = exp->exp_obd;
968         ENTRY;
969
970         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
971         LASSERT(obd != NULL);
972
973         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
974                exp->exp_client_uuid.uuid, obd->obd_name);
975
976         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
977         if (exp->exp_connection)
978                 ptlrpc_put_connection_superhack(exp->exp_connection);
979
980         LASSERT(list_empty(&exp->exp_outstanding_replies));
981         LASSERT(list_empty(&exp->exp_uncommitted_replies));
982         LASSERT(list_empty(&exp->exp_req_replay_queue));
983         LASSERT(list_empty(&exp->exp_hp_rpcs));
984         obd_destroy_export(exp);
985         /* self export doesn't hold a reference to an obd, although it
986          * exists until freeing of the obd */
987         if (exp != obd->obd_self_export)
988                 class_decref(obd, "export", exp);
989
990         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
991         EXIT;
992 }
993
994 static void export_handle_addref(void *export)
995 {
996         class_export_get(export);
997 }
998
999 static struct portals_handle_ops export_handle_ops = {
1000         .hop_addref = export_handle_addref,
1001         .hop_free   = NULL,
1002 };
1003
1004 struct obd_export *class_export_get(struct obd_export *exp)
1005 {
1006         atomic_inc(&exp->exp_refcount);
1007         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1008                atomic_read(&exp->exp_refcount));
1009         return exp;
1010 }
1011 EXPORT_SYMBOL(class_export_get);
1012
1013 void class_export_put(struct obd_export *exp)
1014 {
1015         LASSERT(exp != NULL);
1016         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1017         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1018                atomic_read(&exp->exp_refcount) - 1);
1019
1020         if (atomic_dec_and_test(&exp->exp_refcount)) {
1021                 struct obd_device *obd = exp->exp_obd;
1022
1023                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1024                        exp, exp->exp_client_uuid.uuid);
1025
1026                 /* release nid stat refererence */
1027                 lprocfs_exp_cleanup(exp);
1028
1029                 if (exp == obd->obd_self_export) {
1030                         /* self export should be destroyed without
1031                          * zombie thread as it doesn't hold a
1032                          * reference to obd and doesn't hold any
1033                          * resources */
1034                         class_export_destroy(exp);
1035                         /* self export is destroyed, no class
1036                          * references exist and it is safe to free
1037                          * obd */
1038                         class_free_dev(obd);
1039                 } else {
1040                         LASSERT(!list_empty(&exp->exp_obd_chain));
1041                         obd_zombie_export_add(exp);
1042                 }
1043
1044         }
1045 }
1046 EXPORT_SYMBOL(class_export_put);
1047
1048 static void obd_zombie_exp_cull(struct work_struct *ws)
1049 {
1050         struct obd_export *export;
1051
1052         export = container_of(ws, struct obd_export, exp_zombie_work);
1053         class_export_destroy(export);
1054 }
1055
1056 /* Creates a new export, adds it to the hash table, and returns a
1057  * pointer to it. The refcount is 2: one for the hash reference, and
1058  * one for the pointer returned by this function. */
1059 struct obd_export *__class_new_export(struct obd_device *obd,
1060                                       struct obd_uuid *cluuid, bool is_self)
1061 {
1062         struct obd_export *export;
1063         struct cfs_hash *hash = NULL;
1064         int rc = 0;
1065         ENTRY;
1066
1067         OBD_ALLOC_PTR(export);
1068         if (!export)
1069                 return ERR_PTR(-ENOMEM);
1070
1071         export->exp_conn_cnt = 0;
1072         export->exp_lock_hash = NULL;
1073         export->exp_flock_hash = NULL;
1074         /* 2 = class_handle_hash + last */
1075         atomic_set(&export->exp_refcount, 2);
1076         atomic_set(&export->exp_rpc_count, 0);
1077         atomic_set(&export->exp_cb_count, 0);
1078         atomic_set(&export->exp_locks_count, 0);
1079 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1080         INIT_LIST_HEAD(&export->exp_locks_list);
1081         spin_lock_init(&export->exp_locks_list_guard);
1082 #endif
1083         atomic_set(&export->exp_replay_count, 0);
1084         export->exp_obd = obd;
1085         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1086         spin_lock_init(&export->exp_uncommitted_replies_lock);
1087         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1088         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1089         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1090         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1091         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1092         class_handle_hash(&export->exp_handle, &export_handle_ops);
1093         export->exp_last_request_time = ktime_get_real_seconds();
1094         spin_lock_init(&export->exp_lock);
1095         spin_lock_init(&export->exp_rpc_lock);
1096         INIT_HLIST_NODE(&export->exp_uuid_hash);
1097         INIT_HLIST_NODE(&export->exp_nid_hash);
1098         INIT_HLIST_NODE(&export->exp_gen_hash);
1099         spin_lock_init(&export->exp_bl_list_lock);
1100         INIT_LIST_HEAD(&export->exp_bl_list);
1101         INIT_LIST_HEAD(&export->exp_stale_list);
1102         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1103
1104         export->exp_sp_peer = LUSTRE_SP_ANY;
1105         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1106         export->exp_client_uuid = *cluuid;
1107         obd_init_export(export);
1108
1109         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1110                 spin_lock(&obd->obd_dev_lock);
1111                 /* shouldn't happen, but might race */
1112                 if (obd->obd_stopping)
1113                         GOTO(exit_unlock, rc = -ENODEV);
1114
1115                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1116                 if (hash == NULL)
1117                         GOTO(exit_unlock, rc = -ENODEV);
1118                 spin_unlock(&obd->obd_dev_lock);
1119
1120                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1121                 if (rc != 0) {
1122                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1123                                       obd->obd_name, cluuid->uuid, rc);
1124                         GOTO(exit_err, rc = -EALREADY);
1125                 }
1126         }
1127
1128         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1129         spin_lock(&obd->obd_dev_lock);
1130         if (obd->obd_stopping) {
1131                 if (hash)
1132                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1133                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1134         }
1135
1136         if (!is_self) {
1137                 class_incref(obd, "export", export);
1138                 list_add_tail(&export->exp_obd_chain_timed,
1139                               &obd->obd_exports_timed);
1140                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1141                 obd->obd_num_exports++;
1142         } else {
1143                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1144                 INIT_LIST_HEAD(&export->exp_obd_chain);
1145         }
1146         spin_unlock(&obd->obd_dev_lock);
1147         if (hash)
1148                 cfs_hash_putref(hash);
1149         RETURN(export);
1150
1151 exit_unlock:
1152         spin_unlock(&obd->obd_dev_lock);
1153 exit_err:
1154         if (hash)
1155                 cfs_hash_putref(hash);
1156         class_handle_unhash(&export->exp_handle);
1157         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1158         obd_destroy_export(export);
1159         OBD_FREE_PTR(export);
1160         return ERR_PTR(rc);
1161 }
1162
1163 struct obd_export *class_new_export(struct obd_device *obd,
1164                                     struct obd_uuid *uuid)
1165 {
1166         return __class_new_export(obd, uuid, false);
1167 }
1168 EXPORT_SYMBOL(class_new_export);
1169
1170 struct obd_export *class_new_export_self(struct obd_device *obd,
1171                                          struct obd_uuid *uuid)
1172 {
1173         return __class_new_export(obd, uuid, true);
1174 }
1175
1176 void class_unlink_export(struct obd_export *exp)
1177 {
1178         class_handle_unhash(&exp->exp_handle);
1179
1180         if (exp->exp_obd->obd_self_export == exp) {
1181                 class_export_put(exp);
1182                 return;
1183         }
1184
1185         spin_lock(&exp->exp_obd->obd_dev_lock);
1186         /* delete an uuid-export hashitem from hashtables */
1187         if (!hlist_unhashed(&exp->exp_uuid_hash))
1188                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1189                              &exp->exp_client_uuid,
1190                              &exp->exp_uuid_hash);
1191
1192 #ifdef HAVE_SERVER_SUPPORT
1193         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1194                 struct tg_export_data   *ted = &exp->exp_target_data;
1195                 struct cfs_hash         *hash;
1196
1197                 /* Because obd_gen_hash will not be released until
1198                  * class_cleanup(), so hash should never be NULL here */
1199                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1200                 LASSERT(hash != NULL);
1201                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1202                              &exp->exp_gen_hash);
1203                 cfs_hash_putref(hash);
1204         }
1205 #endif /* HAVE_SERVER_SUPPORT */
1206
1207         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1208         list_del_init(&exp->exp_obd_chain_timed);
1209         exp->exp_obd->obd_num_exports--;
1210         spin_unlock(&exp->exp_obd->obd_dev_lock);
1211         atomic_inc(&obd_stale_export_num);
1212
1213         /* A reference is kept by obd_stale_exports list */
1214         obd_stale_export_put(exp);
1215 }
1216 EXPORT_SYMBOL(class_unlink_export);
1217
1218 /* Import management functions */
1219 static void obd_zombie_import_free(struct obd_import *imp)
1220 {
1221         ENTRY;
1222
1223         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1224                 imp->imp_obd->obd_name);
1225
1226         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1227
1228         ptlrpc_put_connection_superhack(imp->imp_connection);
1229
1230         while (!list_empty(&imp->imp_conn_list)) {
1231                 struct obd_import_conn *imp_conn;
1232
1233                 imp_conn = list_entry(imp->imp_conn_list.next,
1234                                       struct obd_import_conn, oic_item);
1235                 list_del_init(&imp_conn->oic_item);
1236                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1237                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1238         }
1239
1240         LASSERT(imp->imp_sec == NULL);
1241         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1242                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1243         class_decref(imp->imp_obd, "import", imp);
1244         OBD_FREE_PTR(imp);
1245         EXIT;
1246 }
1247
1248 struct obd_import *class_import_get(struct obd_import *import)
1249 {
1250         atomic_inc(&import->imp_refcount);
1251         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1252                atomic_read(&import->imp_refcount),
1253                import->imp_obd->obd_name);
1254         return import;
1255 }
1256 EXPORT_SYMBOL(class_import_get);
1257
1258 void class_import_put(struct obd_import *imp)
1259 {
1260         ENTRY;
1261
1262         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1263
1264         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1265                atomic_read(&imp->imp_refcount) - 1,
1266                imp->imp_obd->obd_name);
1267
1268         if (atomic_dec_and_test(&imp->imp_refcount)) {
1269                 CDEBUG(D_INFO, "final put import %p\n", imp);
1270                 obd_zombie_import_add(imp);
1271         }
1272
1273         EXIT;
1274 }
1275 EXPORT_SYMBOL(class_import_put);
1276
1277 static void init_imp_at(struct imp_at *at) {
1278         int i;
1279         at_init(&at->iat_net_latency, 0, 0);
1280         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1281                 /* max service estimates are tracked on the server side, so
1282                    don't use the AT history here, just use the last reported
1283                    val. (But keep hist for proc histogram, worst_ever) */
1284                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1285                         AT_FLG_NOHIST);
1286         }
1287 }
1288
1289 static void obd_zombie_imp_cull(struct work_struct *ws)
1290 {
1291         struct obd_import *import;
1292
1293         import = container_of(ws, struct obd_import, imp_zombie_work);
1294         obd_zombie_import_free(import);
1295 }
1296
1297 struct obd_import *class_new_import(struct obd_device *obd)
1298 {
1299         struct obd_import *imp;
1300         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1301
1302         OBD_ALLOC(imp, sizeof(*imp));
1303         if (imp == NULL)
1304                 return NULL;
1305
1306         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1307         INIT_LIST_HEAD(&imp->imp_replay_list);
1308         INIT_LIST_HEAD(&imp->imp_sending_list);
1309         INIT_LIST_HEAD(&imp->imp_delayed_list);
1310         INIT_LIST_HEAD(&imp->imp_committed_list);
1311         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1312         imp->imp_known_replied_xid = 0;
1313         imp->imp_replay_cursor = &imp->imp_committed_list;
1314         spin_lock_init(&imp->imp_lock);
1315         imp->imp_last_success_conn = 0;
1316         imp->imp_state = LUSTRE_IMP_NEW;
1317         imp->imp_obd = class_incref(obd, "import", imp);
1318         mutex_init(&imp->imp_sec_mutex);
1319         init_waitqueue_head(&imp->imp_recovery_waitq);
1320         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1321
1322         if (curr_pid_ns && curr_pid_ns->child_reaper)
1323                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1324         else
1325                 imp->imp_sec_refpid = 1;
1326
1327         atomic_set(&imp->imp_refcount, 2);
1328         atomic_set(&imp->imp_unregistering, 0);
1329         atomic_set(&imp->imp_reqs, 0);
1330         atomic_set(&imp->imp_inflight, 0);
1331         atomic_set(&imp->imp_replay_inflight, 0);
1332         atomic_set(&imp->imp_inval_count, 0);
1333         INIT_LIST_HEAD(&imp->imp_conn_list);
1334         init_imp_at(&imp->imp_at);
1335
1336         /* the default magic is V2, will be used in connect RPC, and
1337          * then adjusted according to the flags in request/reply. */
1338         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1339
1340         return imp;
1341 }
1342 EXPORT_SYMBOL(class_new_import);
1343
1344 void class_destroy_import(struct obd_import *import)
1345 {
1346         LASSERT(import != NULL);
1347         LASSERT(import != LP_POISON);
1348
1349         spin_lock(&import->imp_lock);
1350         import->imp_generation++;
1351         spin_unlock(&import->imp_lock);
1352         class_import_put(import);
1353 }
1354 EXPORT_SYMBOL(class_destroy_import);
1355
1356 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1357
1358 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1359 {
1360         spin_lock(&exp->exp_locks_list_guard);
1361
1362         LASSERT(lock->l_exp_refs_nr >= 0);
1363
1364         if (lock->l_exp_refs_target != NULL &&
1365             lock->l_exp_refs_target != exp) {
1366                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1367                               exp, lock, lock->l_exp_refs_target);
1368         }
1369         if ((lock->l_exp_refs_nr ++) == 0) {
1370                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1371                 lock->l_exp_refs_target = exp;
1372         }
1373         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1374                lock, exp, lock->l_exp_refs_nr);
1375         spin_unlock(&exp->exp_locks_list_guard);
1376 }
1377 EXPORT_SYMBOL(__class_export_add_lock_ref);
1378
1379 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1380 {
1381         spin_lock(&exp->exp_locks_list_guard);
1382         LASSERT(lock->l_exp_refs_nr > 0);
1383         if (lock->l_exp_refs_target != exp) {
1384                 LCONSOLE_WARN("lock %p, "
1385                               "mismatching export pointers: %p, %p\n",
1386                               lock, lock->l_exp_refs_target, exp);
1387         }
1388         if (-- lock->l_exp_refs_nr == 0) {
1389                 list_del_init(&lock->l_exp_refs_link);
1390                 lock->l_exp_refs_target = NULL;
1391         }
1392         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1393                lock, exp, lock->l_exp_refs_nr);
1394         spin_unlock(&exp->exp_locks_list_guard);
1395 }
1396 EXPORT_SYMBOL(__class_export_del_lock_ref);
1397 #endif
1398
1399 /* A connection defines an export context in which preallocation can
1400    be managed. This releases the export pointer reference, and returns
1401    the export handle, so the export refcount is 1 when this function
1402    returns. */
1403 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1404                   struct obd_uuid *cluuid)
1405 {
1406         struct obd_export *export;
1407         LASSERT(conn != NULL);
1408         LASSERT(obd != NULL);
1409         LASSERT(cluuid != NULL);
1410         ENTRY;
1411
1412         export = class_new_export(obd, cluuid);
1413         if (IS_ERR(export))
1414                 RETURN(PTR_ERR(export));
1415
1416         conn->cookie = export->exp_handle.h_cookie;
1417         class_export_put(export);
1418
1419         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1420                cluuid->uuid, conn->cookie);
1421         RETURN(0);
1422 }
1423 EXPORT_SYMBOL(class_connect);
1424
1425 /* if export is involved in recovery then clean up related things */
1426 static void class_export_recovery_cleanup(struct obd_export *exp)
1427 {
1428         struct obd_device *obd = exp->exp_obd;
1429
1430         spin_lock(&obd->obd_recovery_task_lock);
1431         if (obd->obd_recovering) {
1432                 if (exp->exp_in_recovery) {
1433                         spin_lock(&exp->exp_lock);
1434                         exp->exp_in_recovery = 0;
1435                         spin_unlock(&exp->exp_lock);
1436                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1437                         atomic_dec(&obd->obd_connected_clients);
1438                 }
1439
1440                 /* if called during recovery then should update
1441                  * obd_stale_clients counter,
1442                  * lightweight exports are not counted */
1443                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1444                         exp->exp_obd->obd_stale_clients++;
1445         }
1446         spin_unlock(&obd->obd_recovery_task_lock);
1447
1448         spin_lock(&exp->exp_lock);
1449         /** Cleanup req replay fields */
1450         if (exp->exp_req_replay_needed) {
1451                 exp->exp_req_replay_needed = 0;
1452
1453                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1454                 atomic_dec(&obd->obd_req_replay_clients);
1455         }
1456
1457         /** Cleanup lock replay data */
1458         if (exp->exp_lock_replay_needed) {
1459                 exp->exp_lock_replay_needed = 0;
1460
1461                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1462                 atomic_dec(&obd->obd_lock_replay_clients);
1463         }
1464         spin_unlock(&exp->exp_lock);
1465 }
1466
1467 /* This function removes 1-3 references from the export:
1468  * 1 - for export pointer passed
1469  * and if disconnect really need
1470  * 2 - removing from hash
1471  * 3 - in client_unlink_export
1472  * The export pointer passed to this function can destroyed */
1473 int class_disconnect(struct obd_export *export)
1474 {
1475         int already_disconnected;
1476         ENTRY;
1477
1478         if (export == NULL) {
1479                 CWARN("attempting to free NULL export %p\n", export);
1480                 RETURN(-EINVAL);
1481         }
1482
1483         spin_lock(&export->exp_lock);
1484         already_disconnected = export->exp_disconnected;
1485         export->exp_disconnected = 1;
1486         /*  We hold references of export for uuid hash
1487          *  and nid_hash and export link at least. So
1488          *  it is safe to call cfs_hash_del in there.  */
1489         if (!hlist_unhashed(&export->exp_nid_hash))
1490                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1491                              &export->exp_connection->c_peer.nid,
1492                              &export->exp_nid_hash);
1493         spin_unlock(&export->exp_lock);
1494
1495         /* class_cleanup(), abort_recovery(), and class_fail_export()
1496          * all end up in here, and if any of them race we shouldn't
1497          * call extra class_export_puts(). */
1498         if (already_disconnected) {
1499                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1500                 GOTO(no_disconn, already_disconnected);
1501         }
1502
1503         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1504                export->exp_handle.h_cookie);
1505
1506         class_export_recovery_cleanup(export);
1507         class_unlink_export(export);
1508 no_disconn:
1509         class_export_put(export);
1510         RETURN(0);
1511 }
1512 EXPORT_SYMBOL(class_disconnect);
1513
1514 /* Return non-zero for a fully connected export */
1515 int class_connected_export(struct obd_export *exp)
1516 {
1517         int connected = 0;
1518
1519         if (exp) {
1520                 spin_lock(&exp->exp_lock);
1521                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1522                 spin_unlock(&exp->exp_lock);
1523         }
1524         return connected;
1525 }
1526 EXPORT_SYMBOL(class_connected_export);
1527
1528 static void class_disconnect_export_list(struct list_head *list,
1529                                          enum obd_option flags)
1530 {
1531         int rc;
1532         struct obd_export *exp;
1533         ENTRY;
1534
1535         /* It's possible that an export may disconnect itself, but
1536          * nothing else will be added to this list. */
1537         while (!list_empty(list)) {
1538                 exp = list_entry(list->next, struct obd_export,
1539                                  exp_obd_chain);
1540                 /* need for safe call CDEBUG after obd_disconnect */
1541                 class_export_get(exp);
1542
1543                 spin_lock(&exp->exp_lock);
1544                 exp->exp_flags = flags;
1545                 spin_unlock(&exp->exp_lock);
1546
1547                 if (obd_uuid_equals(&exp->exp_client_uuid,
1548                                     &exp->exp_obd->obd_uuid)) {
1549                         CDEBUG(D_HA,
1550                                "exp %p export uuid == obd uuid, don't discon\n",
1551                                exp);
1552                         /* Need to delete this now so we don't end up pointing
1553                          * to work_list later when this export is cleaned up. */
1554                         list_del_init(&exp->exp_obd_chain);
1555                         class_export_put(exp);
1556                         continue;
1557                 }
1558
1559                 class_export_get(exp);
1560                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1561                        "last request at %lld\n",
1562                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1563                        exp, exp->exp_last_request_time);
1564                 /* release one export reference anyway */
1565                 rc = obd_disconnect(exp);
1566
1567                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1568                        obd_export_nid2str(exp), exp, rc);
1569                 class_export_put(exp);
1570         }
1571         EXIT;
1572 }
1573
1574 void class_disconnect_exports(struct obd_device *obd)
1575 {
1576         struct list_head work_list;
1577         ENTRY;
1578
1579         /* Move all of the exports from obd_exports to a work list, en masse. */
1580         INIT_LIST_HEAD(&work_list);
1581         spin_lock(&obd->obd_dev_lock);
1582         list_splice_init(&obd->obd_exports, &work_list);
1583         list_splice_init(&obd->obd_delayed_exports, &work_list);
1584         spin_unlock(&obd->obd_dev_lock);
1585
1586         if (!list_empty(&work_list)) {
1587                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1588                        "disconnecting them\n", obd->obd_minor, obd);
1589                 class_disconnect_export_list(&work_list,
1590                                              exp_flags_from_obd(obd));
1591         } else
1592                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1593                        obd->obd_minor, obd);
1594         EXIT;
1595 }
1596 EXPORT_SYMBOL(class_disconnect_exports);
1597
1598 /* Remove exports that have not completed recovery.
1599  */
1600 void class_disconnect_stale_exports(struct obd_device *obd,
1601                                     int (*test_export)(struct obd_export *))
1602 {
1603         struct list_head work_list;
1604         struct obd_export *exp, *n;
1605         int evicted = 0;
1606         ENTRY;
1607
1608         INIT_LIST_HEAD(&work_list);
1609         spin_lock(&obd->obd_dev_lock);
1610         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1611                                  exp_obd_chain) {
1612                 /* don't count self-export as client */
1613                 if (obd_uuid_equals(&exp->exp_client_uuid,
1614                                     &exp->exp_obd->obd_uuid))
1615                         continue;
1616
1617                 /* don't evict clients which have no slot in last_rcvd
1618                  * (e.g. lightweight connection) */
1619                 if (exp->exp_target_data.ted_lr_idx == -1)
1620                         continue;
1621
1622                 spin_lock(&exp->exp_lock);
1623                 if (exp->exp_failed || test_export(exp)) {
1624                         spin_unlock(&exp->exp_lock);
1625                         continue;
1626                 }
1627                 exp->exp_failed = 1;
1628                 spin_unlock(&exp->exp_lock);
1629
1630                 list_move(&exp->exp_obd_chain, &work_list);
1631                 evicted++;
1632                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1633                        obd->obd_name, exp->exp_client_uuid.uuid,
1634                        obd_export_nid2str(exp));
1635                 print_export_data(exp, "EVICTING", 0, D_HA);
1636         }
1637         spin_unlock(&obd->obd_dev_lock);
1638
1639         if (evicted)
1640                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1641                               obd->obd_name, evicted);
1642
1643         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1644                                                  OBD_OPT_ABORT_RECOV);
1645         EXIT;
1646 }
1647 EXPORT_SYMBOL(class_disconnect_stale_exports);
1648
1649 void class_fail_export(struct obd_export *exp)
1650 {
1651         int rc, already_failed;
1652
1653         spin_lock(&exp->exp_lock);
1654         already_failed = exp->exp_failed;
1655         exp->exp_failed = 1;
1656         spin_unlock(&exp->exp_lock);
1657
1658         if (already_failed) {
1659                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1660                        exp, exp->exp_client_uuid.uuid);
1661                 return;
1662         }
1663
1664         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1665                exp, exp->exp_client_uuid.uuid);
1666
1667         if (obd_dump_on_timeout)
1668                 libcfs_debug_dumplog();
1669
1670         /* need for safe call CDEBUG after obd_disconnect */
1671         class_export_get(exp);
1672
1673         /* Most callers into obd_disconnect are removing their own reference
1674          * (request, for example) in addition to the one from the hash table.
1675          * We don't have such a reference here, so make one. */
1676         class_export_get(exp);
1677         rc = obd_disconnect(exp);
1678         if (rc)
1679                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1680         else
1681                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1682                        exp, exp->exp_client_uuid.uuid);
1683         class_export_put(exp);
1684 }
1685 EXPORT_SYMBOL(class_fail_export);
1686
1687 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1688 {
1689         struct cfs_hash *nid_hash;
1690         struct obd_export *doomed_exp = NULL;
1691         int exports_evicted = 0;
1692
1693         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1694
1695         spin_lock(&obd->obd_dev_lock);
1696         /* umount has run already, so evict thread should leave
1697          * its task to umount thread now */
1698         if (obd->obd_stopping) {
1699                 spin_unlock(&obd->obd_dev_lock);
1700                 return exports_evicted;
1701         }
1702         nid_hash = obd->obd_nid_hash;
1703         cfs_hash_getref(nid_hash);
1704         spin_unlock(&obd->obd_dev_lock);
1705
1706         do {
1707                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1708                 if (doomed_exp == NULL)
1709                         break;
1710
1711                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1712                          "nid %s found, wanted nid %s, requested nid %s\n",
1713                          obd_export_nid2str(doomed_exp),
1714                          libcfs_nid2str(nid_key), nid);
1715                 LASSERTF(doomed_exp != obd->obd_self_export,
1716                          "self-export is hashed by NID?\n");
1717                 exports_evicted++;
1718                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1719                               "request\n", obd->obd_name,
1720                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1721                               obd_export_nid2str(doomed_exp));
1722                 class_fail_export(doomed_exp);
1723                 class_export_put(doomed_exp);
1724         } while (1);
1725
1726         cfs_hash_putref(nid_hash);
1727
1728         if (!exports_evicted)
1729                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1730                        obd->obd_name, nid);
1731         return exports_evicted;
1732 }
1733 EXPORT_SYMBOL(obd_export_evict_by_nid);
1734
1735 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1736 {
1737         struct cfs_hash *uuid_hash;
1738         struct obd_export *doomed_exp = NULL;
1739         struct obd_uuid doomed_uuid;
1740         int exports_evicted = 0;
1741
1742         spin_lock(&obd->obd_dev_lock);
1743         if (obd->obd_stopping) {
1744                 spin_unlock(&obd->obd_dev_lock);
1745                 return exports_evicted;
1746         }
1747         uuid_hash = obd->obd_uuid_hash;
1748         cfs_hash_getref(uuid_hash);
1749         spin_unlock(&obd->obd_dev_lock);
1750
1751         obd_str2uuid(&doomed_uuid, uuid);
1752         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1753                 CERROR("%s: can't evict myself\n", obd->obd_name);
1754                 cfs_hash_putref(uuid_hash);
1755                 return exports_evicted;
1756         }
1757
1758         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1759
1760         if (doomed_exp == NULL) {
1761                 CERROR("%s: can't disconnect %s: no exports found\n",
1762                        obd->obd_name, uuid);
1763         } else {
1764                 CWARN("%s: evicting %s at adminstrative request\n",
1765                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1766                 class_fail_export(doomed_exp);
1767                 class_export_put(doomed_exp);
1768                 exports_evicted++;
1769         }
1770         cfs_hash_putref(uuid_hash);
1771
1772         return exports_evicted;
1773 }
1774
1775 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1776 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1777 EXPORT_SYMBOL(class_export_dump_hook);
1778 #endif
1779
1780 static void print_export_data(struct obd_export *exp, const char *status,
1781                               int locks, int debug_level)
1782 {
1783         struct ptlrpc_reply_state *rs;
1784         struct ptlrpc_reply_state *first_reply = NULL;
1785         int nreplies = 0;
1786
1787         spin_lock(&exp->exp_lock);
1788         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1789                             rs_exp_list) {
1790                 if (nreplies == 0)
1791                         first_reply = rs;
1792                 nreplies++;
1793         }
1794         spin_unlock(&exp->exp_lock);
1795
1796         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1797                "%p %s %llu stale:%d\n",
1798                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1799                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1800                atomic_read(&exp->exp_rpc_count),
1801                atomic_read(&exp->exp_cb_count),
1802                atomic_read(&exp->exp_locks_count),
1803                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1804                nreplies, first_reply, nreplies > 3 ? "..." : "",
1805                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1806 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1807         if (locks && class_export_dump_hook != NULL)
1808                 class_export_dump_hook(exp);
1809 #endif
1810 }
1811
1812 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1813 {
1814         struct obd_export *exp;
1815
1816         spin_lock(&obd->obd_dev_lock);
1817         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1818                 print_export_data(exp, "ACTIVE", locks, debug_level);
1819         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1820                 print_export_data(exp, "UNLINKED", locks, debug_level);
1821         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1822                 print_export_data(exp, "DELAYED", locks, debug_level);
1823         spin_unlock(&obd->obd_dev_lock);
1824 }
1825
1826 void obd_exports_barrier(struct obd_device *obd)
1827 {
1828         int waited = 2;
1829         LASSERT(list_empty(&obd->obd_exports));
1830         spin_lock(&obd->obd_dev_lock);
1831         while (!list_empty(&obd->obd_unlinked_exports)) {
1832                 spin_unlock(&obd->obd_dev_lock);
1833                 set_current_state(TASK_UNINTERRUPTIBLE);
1834                 schedule_timeout(cfs_time_seconds(waited));
1835                 if (waited > 5 && is_power_of_2(waited)) {
1836                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1837                                       "more than %d seconds. "
1838                                       "The obd refcount = %d. Is it stuck?\n",
1839                                       obd->obd_name, waited,
1840                                       atomic_read(&obd->obd_refcount));
1841                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1842                 }
1843                 waited *= 2;
1844                 spin_lock(&obd->obd_dev_lock);
1845         }
1846         spin_unlock(&obd->obd_dev_lock);
1847 }
1848 EXPORT_SYMBOL(obd_exports_barrier);
1849
1850 /**
1851  * Add export to the obd_zombe thread and notify it.
1852  */
1853 static void obd_zombie_export_add(struct obd_export *exp) {
1854         atomic_dec(&obd_stale_export_num);
1855         spin_lock(&exp->exp_obd->obd_dev_lock);
1856         LASSERT(!list_empty(&exp->exp_obd_chain));
1857         list_del_init(&exp->exp_obd_chain);
1858         spin_unlock(&exp->exp_obd->obd_dev_lock);
1859
1860         queue_work(zombie_wq, &exp->exp_zombie_work);
1861 }
1862
1863 /**
1864  * Add import to the obd_zombe thread and notify it.
1865  */
1866 static void obd_zombie_import_add(struct obd_import *imp) {
1867         LASSERT(imp->imp_sec == NULL);
1868
1869         queue_work(zombie_wq, &imp->imp_zombie_work);
1870 }
1871
1872 /**
1873  * wait when obd_zombie import/export queues become empty
1874  */
1875 void obd_zombie_barrier(void)
1876 {
1877         flush_workqueue(zombie_wq);
1878 }
1879 EXPORT_SYMBOL(obd_zombie_barrier);
1880
1881
1882 struct obd_export *obd_stale_export_get(void)
1883 {
1884         struct obd_export *exp = NULL;
1885         ENTRY;
1886
1887         spin_lock(&obd_stale_export_lock);
1888         if (!list_empty(&obd_stale_exports)) {
1889                 exp = list_entry(obd_stale_exports.next,
1890                                  struct obd_export, exp_stale_list);
1891                 list_del_init(&exp->exp_stale_list);
1892         }
1893         spin_unlock(&obd_stale_export_lock);
1894
1895         if (exp) {
1896                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1897                        atomic_read(&obd_stale_export_num));
1898         }
1899         RETURN(exp);
1900 }
1901 EXPORT_SYMBOL(obd_stale_export_get);
1902
1903 void obd_stale_export_put(struct obd_export *exp)
1904 {
1905         ENTRY;
1906
1907         LASSERT(list_empty(&exp->exp_stale_list));
1908         if (exp->exp_lock_hash &&
1909             atomic_read(&exp->exp_lock_hash->hs_count)) {
1910                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1911                        atomic_read(&obd_stale_export_num));
1912
1913                 spin_lock_bh(&exp->exp_bl_list_lock);
1914                 spin_lock(&obd_stale_export_lock);
1915                 /* Add to the tail if there is no blocked locks,
1916                  * to the head otherwise. */
1917                 if (list_empty(&exp->exp_bl_list))
1918                         list_add_tail(&exp->exp_stale_list,
1919                                       &obd_stale_exports);
1920                 else
1921                         list_add(&exp->exp_stale_list,
1922                                  &obd_stale_exports);
1923
1924                 spin_unlock(&obd_stale_export_lock);
1925                 spin_unlock_bh(&exp->exp_bl_list_lock);
1926         } else {
1927                 class_export_put(exp);
1928         }
1929         EXIT;
1930 }
1931 EXPORT_SYMBOL(obd_stale_export_put);
1932
1933 /**
1934  * Adjust the position of the export in the stale list,
1935  * i.e. move to the head of the list if is needed.
1936  **/
1937 void obd_stale_export_adjust(struct obd_export *exp)
1938 {
1939         LASSERT(exp != NULL);
1940         spin_lock_bh(&exp->exp_bl_list_lock);
1941         spin_lock(&obd_stale_export_lock);
1942
1943         if (!list_empty(&exp->exp_stale_list) &&
1944             !list_empty(&exp->exp_bl_list))
1945                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1946
1947         spin_unlock(&obd_stale_export_lock);
1948         spin_unlock_bh(&exp->exp_bl_list_lock);
1949 }
1950 EXPORT_SYMBOL(obd_stale_export_adjust);
1951
1952 /**
1953  * start destroy zombie import/export thread
1954  */
1955 int obd_zombie_impexp_init(void)
1956 {
1957         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1958         if (!zombie_wq)
1959                 return -ENOMEM;
1960
1961         return 0;
1962 }
1963
1964 /**
1965  * stop destroy zombie import/export thread
1966  */
1967 void obd_zombie_impexp_stop(void)
1968 {
1969         destroy_workqueue(zombie_wq);
1970         LASSERT(list_empty(&obd_stale_exports));
1971 }
1972
1973 /***** Kernel-userspace comm helpers *******/
1974
1975 /* Get length of entire message, including header */
1976 int kuc_len(int payload_len)
1977 {
1978         return sizeof(struct kuc_hdr) + payload_len;
1979 }
1980 EXPORT_SYMBOL(kuc_len);
1981
1982 /* Get a pointer to kuc header, given a ptr to the payload
1983  * @param p Pointer to payload area
1984  * @returns Pointer to kuc header
1985  */
1986 struct kuc_hdr * kuc_ptr(void *p)
1987 {
1988         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1989         LASSERT(lh->kuc_magic == KUC_MAGIC);
1990         return lh;
1991 }
1992 EXPORT_SYMBOL(kuc_ptr);
1993
1994 /* Alloc space for a message, and fill in header
1995  * @return Pointer to payload area
1996  */
1997 void *kuc_alloc(int payload_len, int transport, int type)
1998 {
1999         struct kuc_hdr *lh;
2000         int len = kuc_len(payload_len);
2001
2002         OBD_ALLOC(lh, len);
2003         if (lh == NULL)
2004                 return ERR_PTR(-ENOMEM);
2005
2006         lh->kuc_magic = KUC_MAGIC;
2007         lh->kuc_transport = transport;
2008         lh->kuc_msgtype = type;
2009         lh->kuc_msglen = len;
2010
2011         return (void *)(lh + 1);
2012 }
2013 EXPORT_SYMBOL(kuc_alloc);
2014
2015 /* Takes pointer to payload area */
2016 void kuc_free(void *p, int payload_len)
2017 {
2018         struct kuc_hdr *lh = kuc_ptr(p);
2019         OBD_FREE(lh, kuc_len(payload_len));
2020 }
2021 EXPORT_SYMBOL(kuc_free);
2022
2023 struct obd_request_slot_waiter {
2024         struct list_head        orsw_entry;
2025         wait_queue_head_t       orsw_waitq;
2026         bool                    orsw_signaled;
2027 };
2028
2029 static bool obd_request_slot_avail(struct client_obd *cli,
2030                                    struct obd_request_slot_waiter *orsw)
2031 {
2032         bool avail;
2033
2034         spin_lock(&cli->cl_loi_list_lock);
2035         avail = !!list_empty(&orsw->orsw_entry);
2036         spin_unlock(&cli->cl_loi_list_lock);
2037
2038         return avail;
2039 };
2040
2041 /*
2042  * For network flow control, the RPC sponsor needs to acquire a credit
2043  * before sending the RPC. The credits count for a connection is defined
2044  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2045  * the subsequent RPC sponsors need to wait until others released their
2046  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2047  */
2048 int obd_get_request_slot(struct client_obd *cli)
2049 {
2050         struct obd_request_slot_waiter   orsw;
2051         struct l_wait_info               lwi;
2052         int                              rc;
2053
2054         spin_lock(&cli->cl_loi_list_lock);
2055         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2056                 cli->cl_rpcs_in_flight++;
2057                 spin_unlock(&cli->cl_loi_list_lock);
2058                 return 0;
2059         }
2060
2061         init_waitqueue_head(&orsw.orsw_waitq);
2062         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2063         orsw.orsw_signaled = false;
2064         spin_unlock(&cli->cl_loi_list_lock);
2065
2066         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2067         rc = l_wait_event(orsw.orsw_waitq,
2068                           obd_request_slot_avail(cli, &orsw) ||
2069                           orsw.orsw_signaled,
2070                           &lwi);
2071
2072         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2073          * freed but other (such as obd_put_request_slot) is using it. */
2074         spin_lock(&cli->cl_loi_list_lock);
2075         if (rc != 0) {
2076                 if (!orsw.orsw_signaled) {
2077                         if (list_empty(&orsw.orsw_entry))
2078                                 cli->cl_rpcs_in_flight--;
2079                         else
2080                                 list_del(&orsw.orsw_entry);
2081                 }
2082         }
2083
2084         if (orsw.orsw_signaled) {
2085                 LASSERT(list_empty(&orsw.orsw_entry));
2086
2087                 rc = -EINTR;
2088         }
2089         spin_unlock(&cli->cl_loi_list_lock);
2090
2091         return rc;
2092 }
2093 EXPORT_SYMBOL(obd_get_request_slot);
2094
2095 void obd_put_request_slot(struct client_obd *cli)
2096 {
2097         struct obd_request_slot_waiter *orsw;
2098
2099         spin_lock(&cli->cl_loi_list_lock);
2100         cli->cl_rpcs_in_flight--;
2101
2102         /* If there is free slot, wakeup the first waiter. */
2103         if (!list_empty(&cli->cl_flight_waiters) &&
2104             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2105                 orsw = list_entry(cli->cl_flight_waiters.next,
2106                                   struct obd_request_slot_waiter, orsw_entry);
2107                 list_del_init(&orsw->orsw_entry);
2108                 cli->cl_rpcs_in_flight++;
2109                 wake_up(&orsw->orsw_waitq);
2110         }
2111         spin_unlock(&cli->cl_loi_list_lock);
2112 }
2113 EXPORT_SYMBOL(obd_put_request_slot);
2114
2115 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2116 {
2117         return cli->cl_max_rpcs_in_flight;
2118 }
2119 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2120
2121 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2122 {
2123         struct obd_request_slot_waiter *orsw;
2124         __u32                           old;
2125         int                             diff;
2126         int                             i;
2127         char                            *typ_name;
2128         int                             rc;
2129
2130         if (max > OBD_MAX_RIF_MAX || max < 1)
2131                 return -ERANGE;
2132
2133         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2134         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2135                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2136                  * strictly lower that max_rpcs_in_flight */
2137                 if (max < 2) {
2138                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2139                                "because it must be higher than "
2140                                "max_mod_rpcs_in_flight value",
2141                                cli->cl_import->imp_obd->obd_name);
2142                         return -ERANGE;
2143                 }
2144                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2145                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2146                         if (rc != 0)
2147                                 return rc;
2148                 }
2149         }
2150
2151         spin_lock(&cli->cl_loi_list_lock);
2152         old = cli->cl_max_rpcs_in_flight;
2153         cli->cl_max_rpcs_in_flight = max;
2154         client_adjust_max_dirty(cli);
2155
2156         diff = max - old;
2157
2158         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2159         for (i = 0; i < diff; i++) {
2160                 if (list_empty(&cli->cl_flight_waiters))
2161                         break;
2162
2163                 orsw = list_entry(cli->cl_flight_waiters.next,
2164                                   struct obd_request_slot_waiter, orsw_entry);
2165                 list_del_init(&orsw->orsw_entry);
2166                 cli->cl_rpcs_in_flight++;
2167                 wake_up(&orsw->orsw_waitq);
2168         }
2169         spin_unlock(&cli->cl_loi_list_lock);
2170
2171         return 0;
2172 }
2173 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2174
2175 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2176 {
2177         return cli->cl_max_mod_rpcs_in_flight;
2178 }
2179 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2180
2181 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2182 {
2183         struct obd_connect_data *ocd;
2184         __u16 maxmodrpcs;
2185         __u16 prev;
2186
2187         if (max > OBD_MAX_RIF_MAX || max < 1)
2188                 return -ERANGE;
2189
2190         /* cannot exceed or equal max_rpcs_in_flight */
2191         if (max >= cli->cl_max_rpcs_in_flight) {
2192                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2193                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2194                        cli->cl_import->imp_obd->obd_name,
2195                        max, cli->cl_max_rpcs_in_flight);
2196                 return -ERANGE;
2197         }
2198
2199         /* cannot exceed max modify RPCs in flight supported by the server */
2200         ocd = &cli->cl_import->imp_connect_data;
2201         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2202                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2203         else
2204                 maxmodrpcs = 1;
2205         if (max > maxmodrpcs) {
2206                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2207                        "higher than max_mod_rpcs_per_client value (%hu) "
2208                        "returned by the server at connection\n",
2209                        cli->cl_import->imp_obd->obd_name,
2210                        max, maxmodrpcs);
2211                 return -ERANGE;
2212         }
2213
2214         spin_lock(&cli->cl_mod_rpcs_lock);
2215
2216         prev = cli->cl_max_mod_rpcs_in_flight;
2217         cli->cl_max_mod_rpcs_in_flight = max;
2218
2219         /* wakeup waiters if limit has been increased */
2220         if (cli->cl_max_mod_rpcs_in_flight > prev)
2221                 wake_up(&cli->cl_mod_rpcs_waitq);
2222
2223         spin_unlock(&cli->cl_mod_rpcs_lock);
2224
2225         return 0;
2226 }
2227 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2228
2229 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2230                                struct seq_file *seq)
2231 {
2232         unsigned long mod_tot = 0, mod_cum;
2233         struct timespec64 now;
2234         int i;
2235
2236         ktime_get_real_ts64(&now);
2237
2238         spin_lock(&cli->cl_mod_rpcs_lock);
2239
2240         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2241                    (s64)now.tv_sec, now.tv_nsec);
2242         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2243                    cli->cl_mod_rpcs_in_flight);
2244
2245         seq_printf(seq, "\n\t\t\tmodify\n");
2246         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2247
2248         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2249
2250         mod_cum = 0;
2251         for (i = 0; i < OBD_HIST_MAX; i++) {
2252                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2253                 mod_cum += mod;
2254                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2255                            i, mod, pct(mod, mod_tot),
2256                            pct(mod_cum, mod_tot));
2257                 if (mod_cum == mod_tot)
2258                         break;
2259         }
2260
2261         spin_unlock(&cli->cl_mod_rpcs_lock);
2262
2263         return 0;
2264 }
2265 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2266
2267 /* The number of modify RPCs sent in parallel is limited
2268  * because the server has a finite number of slots per client to
2269  * store request result and ensure reply reconstruction when needed.
2270  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2271  * that takes into account server limit and cl_max_rpcs_in_flight
2272  * value.
2273  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2274  * one close request is allowed above the maximum.
2275  */
2276 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2277                                                  bool close_req)
2278 {
2279         bool avail;
2280
2281         /* A slot is available if
2282          * - number of modify RPCs in flight is less than the max
2283          * - it's a close RPC and no other close request is in flight
2284          */
2285         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2286                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2287
2288         return avail;
2289 }
2290
2291 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2292                                          bool close_req)
2293 {
2294         bool avail;
2295
2296         spin_lock(&cli->cl_mod_rpcs_lock);
2297         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2298         spin_unlock(&cli->cl_mod_rpcs_lock);
2299         return avail;
2300 }
2301
2302 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2303 {
2304         if (it != NULL &&
2305             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2306              it->it_op == IT_READDIR ||
2307              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2308                         return true;
2309         return false;
2310 }
2311
2312 /* Get a modify RPC slot from the obd client @cli according
2313  * to the kind of operation @opc that is going to be sent
2314  * and the intent @it of the operation if it applies.
2315  * If the maximum number of modify RPCs in flight is reached
2316  * the thread is put to sleep.
2317  * Returns the tag to be set in the request message. Tag 0
2318  * is reserved for non-modifying requests.
2319  */
2320 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2321                            struct lookup_intent *it)
2322 {
2323         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2324         bool                    close_req = false;
2325         __u16                   i, max;
2326
2327         /* read-only metadata RPCs don't consume a slot on MDT
2328          * for reply reconstruction
2329          */
2330         if (obd_skip_mod_rpc_slot(it))
2331                 return 0;
2332
2333         if (opc == MDS_CLOSE)
2334                 close_req = true;
2335
2336         do {
2337                 spin_lock(&cli->cl_mod_rpcs_lock);
2338                 max = cli->cl_max_mod_rpcs_in_flight;
2339                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2340                         /* there is a slot available */
2341                         cli->cl_mod_rpcs_in_flight++;
2342                         if (close_req)
2343                                 cli->cl_close_rpcs_in_flight++;
2344                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2345                                          cli->cl_mod_rpcs_in_flight);
2346                         /* find a free tag */
2347                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2348                                                 max + 1);
2349                         LASSERT(i < OBD_MAX_RIF_MAX);
2350                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2351                         spin_unlock(&cli->cl_mod_rpcs_lock);
2352                         /* tag 0 is reserved for non-modify RPCs */
2353                         return i + 1;
2354                 }
2355                 spin_unlock(&cli->cl_mod_rpcs_lock);
2356
2357                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2358                        "opc %u, max %hu\n",
2359                        cli->cl_import->imp_obd->obd_name, opc, max);
2360
2361                 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2362                                        obd_mod_rpc_slot_avail(cli, close_req),
2363                                        &lwi);
2364         } while (true);
2365 }
2366 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2367
2368 /* Put a modify RPC slot from the obd client @cli according
2369  * to the kind of operation @opc that has been sent and the
2370  * intent @it of the operation if it applies.
2371  */
2372 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2373                           struct lookup_intent *it, __u16 tag)
2374 {
2375         bool                    close_req = false;
2376
2377         if (obd_skip_mod_rpc_slot(it))
2378                 return;
2379
2380         if (opc == MDS_CLOSE)
2381                 close_req = true;
2382
2383         spin_lock(&cli->cl_mod_rpcs_lock);
2384         cli->cl_mod_rpcs_in_flight--;
2385         if (close_req)
2386                 cli->cl_close_rpcs_in_flight--;
2387         /* release the tag in the bitmap */
2388         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2389         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2390         spin_unlock(&cli->cl_mod_rpcs_lock);
2391         wake_up(&cli->cl_mod_rpcs_waitq);
2392 }
2393 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2394