Whamcloud - gitweb
22ad827c1bfe10896e52397696b8251854fb8ee1
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53
54 static struct kmem_cache *obd_device_cachep;
55 struct kmem_cache *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 static struct kmem_cache *import_cachep;
58
59 static struct workqueue_struct *zombie_wq;
60
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64                               const char *status, int locks, int debug_level);
65
66 static LIST_HEAD(obd_stale_exports);
67 static DEFINE_SPINLOCK(obd_stale_export_lock);
68 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
69
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
72
73 /*
74  * support functions: we could use inter-module communication, but this
75  * is more portable to other OS's
76  */
77 static struct obd_device *obd_device_alloc(void)
78 {
79         struct obd_device *obd;
80
81         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
82         if (obd != NULL) {
83                 obd->obd_magic = OBD_DEVICE_MAGIC;
84         }
85         return obd;
86 }
87
88 static void obd_device_free(struct obd_device *obd)
89 {
90         LASSERT(obd != NULL);
91         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
92                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
93         if (obd->obd_namespace != NULL) {
94                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
95                        obd, obd->obd_namespace, obd->obd_force);
96                 LBUG();
97         }
98         lu_ref_fini(&obd->obd_reference);
99         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
100 }
101
102 struct obd_type *class_search_type(const char *name)
103 {
104         struct list_head *tmp;
105         struct obd_type *type;
106
107         spin_lock(&obd_types_lock);
108         list_for_each(tmp, &obd_types) {
109                 type = list_entry(tmp, struct obd_type, typ_chain);
110                 if (strcmp(type->typ_name, name) == 0) {
111                         spin_unlock(&obd_types_lock);
112                         return type;
113                 }
114         }
115         spin_unlock(&obd_types_lock);
116         return NULL;
117 }
118 EXPORT_SYMBOL(class_search_type);
119
120 struct obd_type *class_get_type(const char *name)
121 {
122         struct obd_type *type = class_search_type(name);
123
124 #ifdef HAVE_MODULE_LOADING_SUPPORT
125         if (!type) {
126                 const char *modname = name;
127
128                 if (strcmp(modname, "obdfilter") == 0)
129                         modname = "ofd";
130
131                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
132                         modname = LUSTRE_OSP_NAME;
133
134                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
135                         modname = LUSTRE_MDT_NAME;
136
137                 if (!request_module("%s", modname)) {
138                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
139                         type = class_search_type(name);
140                 } else {
141                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
142                                            modname);
143                 }
144         }
145 #endif
146         if (type) {
147                 spin_lock(&type->obd_type_lock);
148                 type->typ_refcnt++;
149                 try_module_get(type->typ_dt_ops->o_owner);
150                 spin_unlock(&type->obd_type_lock);
151         }
152         return type;
153 }
154
155 void class_put_type(struct obd_type *type)
156 {
157         LASSERT(type);
158         spin_lock(&type->obd_type_lock);
159         type->typ_refcnt--;
160         module_put(type->typ_dt_ops->o_owner);
161         spin_unlock(&type->obd_type_lock);
162 }
163
164 static void class_sysfs_release(struct kobject *kobj)
165 {
166         OBD_FREE(kobj, sizeof(*kobj));
167 }
168
169 static struct kobj_type class_ktype = {
170         .sysfs_ops      = &lustre_sysfs_ops,
171         .release        = class_sysfs_release,
172 };
173
174 struct kobject *class_setup_tunables(const char *name)
175 {
176         struct kobject *kobj;
177         int rc;
178
179 #ifdef HAVE_SERVER_SUPPORT
180         kobj = kset_find_obj(lustre_kset, name);
181         if (kobj)
182                 return kobj;
183 #endif
184         OBD_ALLOC(kobj, sizeof(*kobj));
185         if (!kobj)
186                 return ERR_PTR(-ENOMEM);
187
188         kobj->kset = lustre_kset;
189         kobject_init(kobj, &class_ktype);
190         rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
191         if (rc) {
192                 kobject_put(kobj);
193                 return ERR_PTR(rc);
194         }
195         return kobj;
196 }
197 EXPORT_SYMBOL(class_setup_tunables);
198
199 #define CLASS_MAX_NAME 1024
200
201 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
202                         bool enable_proc, struct lprocfs_vars *vars,
203                         const char *name, struct lu_device_type *ldt)
204 {
205         struct obd_type *type;
206 #ifdef HAVE_SERVER_SUPPORT
207         struct qstr dname;
208 #endif /* HAVE_SERVER_SUPPORT */
209         int rc = 0;
210
211         ENTRY;
212         /* sanity check */
213         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
214
215         if (class_search_type(name)) {
216                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
217                 RETURN(-EEXIST);
218         }
219
220         rc = -ENOMEM;
221         OBD_ALLOC(type, sizeof(*type));
222         if (type == NULL)
223                 RETURN(rc);
224
225         OBD_ALLOC_PTR(type->typ_dt_ops);
226         OBD_ALLOC_PTR(type->typ_md_ops);
227         OBD_ALLOC(type->typ_name, strlen(name) + 1);
228
229         if (type->typ_dt_ops == NULL ||
230             type->typ_md_ops == NULL ||
231             type->typ_name == NULL)
232                 GOTO (failed, rc);
233
234         *(type->typ_dt_ops) = *dt_ops;
235         /* md_ops is optional */
236         if (md_ops)
237                 *(type->typ_md_ops) = *md_ops;
238         strcpy(type->typ_name, name);
239         spin_lock_init(&type->obd_type_lock);
240
241 #ifdef CONFIG_PROC_FS
242         if (enable_proc) {
243                 type->typ_procroot = lprocfs_register(type->typ_name,
244                                                       proc_lustre_root,
245                                                       vars, type);
246                 if (IS_ERR(type->typ_procroot)) {
247                         rc = PTR_ERR(type->typ_procroot);
248                         type->typ_procroot = NULL;
249                         GOTO(failed, rc);
250                 }
251         }
252 #endif
253 #ifdef HAVE_SERVER_SUPPORT
254         dname.name = name;
255         dname.len = strlen(dname.name);
256         dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
257                                        dname.len);
258         type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
259         if (type->typ_debugfs_entry) {
260                 dput(type->typ_debugfs_entry);
261                 type->typ_sym_filter = true;
262                 goto dir_exist;
263         }
264 #endif /* HAVE_SERVER_SUPPORT */
265
266         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
267                                                     debugfs_lustre_root,
268                                                     NULL, type);
269         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
270                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
271                                              : -ENOMEM;
272                 type->typ_debugfs_entry = NULL;
273                 GOTO(failed, rc);
274         }
275 #ifdef HAVE_SERVER_SUPPORT
276 dir_exist:
277 #endif
278         type->typ_kobj = class_setup_tunables(type->typ_name);
279         if (IS_ERR(type->typ_kobj))
280                 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
281
282         if (ldt) {
283                 type->typ_lu = ldt;
284                 rc = lu_device_type_init(ldt);
285                 if (rc) {
286                         kobject_put(type->typ_kobj);
287                         GOTO(failed, rc);
288                 }
289         }
290
291         spin_lock(&obd_types_lock);
292         list_add(&type->typ_chain, &obd_types);
293         spin_unlock(&obd_types_lock);
294
295         RETURN(0);
296
297 failed:
298 #ifdef HAVE_SERVER_SUPPORT
299         if (type->typ_sym_filter)
300                 type->typ_debugfs_entry = NULL;
301 #endif
302         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
303                 ldebugfs_remove(&type->typ_debugfs_entry);
304         if (type->typ_name != NULL) {
305 #ifdef CONFIG_PROC_FS
306                 if (type->typ_procroot != NULL)
307                         remove_proc_subtree(type->typ_name, proc_lustre_root);
308 #endif
309                 OBD_FREE(type->typ_name, strlen(name) + 1);
310         }
311         if (type->typ_md_ops != NULL)
312                 OBD_FREE_PTR(type->typ_md_ops);
313         if (type->typ_dt_ops != NULL)
314                 OBD_FREE_PTR(type->typ_dt_ops);
315         OBD_FREE(type, sizeof(*type));
316         RETURN(rc);
317 }
318 EXPORT_SYMBOL(class_register_type);
319
320 int class_unregister_type(const char *name)
321 {
322         struct obd_type *type = class_search_type(name);
323         ENTRY;
324
325         if (!type) {
326                 CERROR("unknown obd type\n");
327                 RETURN(-EINVAL);
328         }
329
330         if (type->typ_refcnt) {
331                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
332                 /* This is a bad situation, let's make the best of it */
333                 /* Remove ops, but leave the name for debugging */
334                 OBD_FREE_PTR(type->typ_dt_ops);
335                 OBD_FREE_PTR(type->typ_md_ops);
336                 RETURN(-EBUSY);
337         }
338
339         kobject_put(type->typ_kobj);
340
341         /* we do not use type->typ_procroot as for compatibility purposes
342          * other modules can share names (i.e. lod can use lov entry). so
343          * we can't reference pointer as it can get invalided when another
344          * module removes the entry */
345 #ifdef CONFIG_PROC_FS
346         if (type->typ_procroot != NULL)
347                 remove_proc_subtree(type->typ_name, proc_lustre_root);
348         if (type->typ_procsym != NULL)
349                 lprocfs_remove(&type->typ_procsym);
350 #endif
351 #ifdef HAVE_SERVER_SUPPORT
352         if (type->typ_sym_filter)
353                 type->typ_debugfs_entry = NULL;
354 #endif
355         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
356                 ldebugfs_remove(&type->typ_debugfs_entry);
357
358         if (type->typ_lu)
359                 lu_device_type_fini(type->typ_lu);
360
361         spin_lock(&obd_types_lock);
362         list_del(&type->typ_chain);
363         spin_unlock(&obd_types_lock);
364         OBD_FREE(type->typ_name, strlen(name) + 1);
365         if (type->typ_dt_ops != NULL)
366                 OBD_FREE_PTR(type->typ_dt_ops);
367         if (type->typ_md_ops != NULL)
368                 OBD_FREE_PTR(type->typ_md_ops);
369         OBD_FREE(type, sizeof(*type));
370         RETURN(0);
371 } /* class_unregister_type */
372 EXPORT_SYMBOL(class_unregister_type);
373
374 /**
375  * Create a new obd device.
376  *
377  * Allocate the new obd_device and initialize it.
378  *
379  * \param[in] type_name obd device type string.
380  * \param[in] name      obd device name.
381  * \param[in] uuid      obd device UUID
382  *
383  * \retval newdev         pointer to created obd_device
384  * \retval ERR_PTR(errno) on error
385  */
386 struct obd_device *class_newdev(const char *type_name, const char *name,
387                                 const char *uuid)
388 {
389         struct obd_device *newdev;
390         struct obd_type *type = NULL;
391         ENTRY;
392
393         if (strlen(name) >= MAX_OBD_NAME) {
394                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
395                 RETURN(ERR_PTR(-EINVAL));
396         }
397
398         type = class_get_type(type_name);
399         if (type == NULL){
400                 CERROR("OBD: unknown type: %s\n", type_name);
401                 RETURN(ERR_PTR(-ENODEV));
402         }
403
404         newdev = obd_device_alloc();
405         if (newdev == NULL) {
406                 class_put_type(type);
407                 RETURN(ERR_PTR(-ENOMEM));
408         }
409         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
410         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
411         newdev->obd_type = type;
412         newdev->obd_minor = -1;
413
414         rwlock_init(&newdev->obd_pool_lock);
415         newdev->obd_pool_limit = 0;
416         newdev->obd_pool_slv = 0;
417
418         INIT_LIST_HEAD(&newdev->obd_exports);
419         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
420         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
421         INIT_LIST_HEAD(&newdev->obd_exports_timed);
422         INIT_LIST_HEAD(&newdev->obd_nid_stats);
423         spin_lock_init(&newdev->obd_nid_lock);
424         spin_lock_init(&newdev->obd_dev_lock);
425         mutex_init(&newdev->obd_dev_mutex);
426         spin_lock_init(&newdev->obd_osfs_lock);
427         /* newdev->obd_osfs_age must be set to a value in the distant
428          * past to guarantee a fresh statfs is fetched on mount. */
429         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
430
431         /* XXX belongs in setup not attach  */
432         init_rwsem(&newdev->obd_observer_link_sem);
433         /* recovery data */
434         spin_lock_init(&newdev->obd_recovery_task_lock);
435         init_waitqueue_head(&newdev->obd_next_transno_waitq);
436         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
437         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
438         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
439         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
440         INIT_LIST_HEAD(&newdev->obd_evict_list);
441         INIT_LIST_HEAD(&newdev->obd_lwp_list);
442
443         llog_group_init(&newdev->obd_olg);
444         /* Detach drops this */
445         atomic_set(&newdev->obd_refcount, 1);
446         lu_ref_init(&newdev->obd_reference);
447         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
448
449         newdev->obd_conn_inprogress = 0;
450
451         strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
452
453         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
454                newdev->obd_name, newdev);
455
456         return newdev;
457 }
458
459 /**
460  * Free obd device.
461  *
462  * \param[in] obd obd_device to be freed
463  *
464  * \retval none
465  */
466 void class_free_dev(struct obd_device *obd)
467 {
468         struct obd_type *obd_type = obd->obd_type;
469
470         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
471                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
472         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
473                  "obd %p != obd_devs[%d] %p\n",
474                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
475         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
476                  "obd_refcount should be 0, not %d\n",
477                  atomic_read(&obd->obd_refcount));
478         LASSERT(obd_type != NULL);
479
480         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
481                obd->obd_name, obd->obd_type->typ_name);
482
483         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
484                          obd->obd_name, obd->obd_uuid.uuid);
485         if (obd->obd_stopping) {
486                 int err;
487
488                 /* If we're not stopping, we were never set up */
489                 err = obd_cleanup(obd);
490                 if (err)
491                         CERROR("Cleanup %s returned %d\n",
492                                 obd->obd_name, err);
493         }
494
495         obd_device_free(obd);
496
497         class_put_type(obd_type);
498 }
499
500 /**
501  * Unregister obd device.
502  *
503  * Free slot in obd_dev[] used by \a obd.
504  *
505  * \param[in] new_obd obd_device to be unregistered
506  *
507  * \retval none
508  */
509 void class_unregister_device(struct obd_device *obd)
510 {
511         write_lock(&obd_dev_lock);
512         if (obd->obd_minor >= 0) {
513                 LASSERT(obd_devs[obd->obd_minor] == obd);
514                 obd_devs[obd->obd_minor] = NULL;
515                 obd->obd_minor = -1;
516         }
517         write_unlock(&obd_dev_lock);
518 }
519
520 /**
521  * Register obd device.
522  *
523  * Find free slot in obd_devs[], fills it with \a new_obd.
524  *
525  * \param[in] new_obd obd_device to be registered
526  *
527  * \retval 0          success
528  * \retval -EEXIST    device with this name is registered
529  * \retval -EOVERFLOW obd_devs[] is full
530  */
531 int class_register_device(struct obd_device *new_obd)
532 {
533         int ret = 0;
534         int i;
535         int new_obd_minor = 0;
536         bool minor_assign = false;
537         bool retried = false;
538
539 again:
540         write_lock(&obd_dev_lock);
541         for (i = 0; i < class_devno_max(); i++) {
542                 struct obd_device *obd = class_num2obd(i);
543
544                 if (obd != NULL &&
545                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
546
547                         if (!retried) {
548                                 write_unlock(&obd_dev_lock);
549
550                                 /* the obd_device could be waited to be
551                                  * destroyed by the "obd_zombie_impexp_thread".
552                                  */
553                                 obd_zombie_barrier();
554                                 retried = true;
555                                 goto again;
556                         }
557
558                         CERROR("%s: already exists, won't add\n",
559                                obd->obd_name);
560                         /* in case we found a free slot before duplicate */
561                         minor_assign = false;
562                         ret = -EEXIST;
563                         break;
564                 }
565                 if (!minor_assign && obd == NULL) {
566                         new_obd_minor = i;
567                         minor_assign = true;
568                 }
569         }
570
571         if (minor_assign) {
572                 new_obd->obd_minor = new_obd_minor;
573                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
574                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
575                 obd_devs[new_obd_minor] = new_obd;
576         } else {
577                 if (ret == 0) {
578                         ret = -EOVERFLOW;
579                         CERROR("%s: all %u/%u devices used, increase "
580                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
581                                i, class_devno_max(), ret);
582                 }
583         }
584         write_unlock(&obd_dev_lock);
585
586         RETURN(ret);
587 }
588
589 static int class_name2dev_nolock(const char *name)
590 {
591         int i;
592
593         if (!name)
594                 return -1;
595
596         for (i = 0; i < class_devno_max(); i++) {
597                 struct obd_device *obd = class_num2obd(i);
598
599                 if (obd && strcmp(name, obd->obd_name) == 0) {
600                         /* Make sure we finished attaching before we give
601                            out any references */
602                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
603                         if (obd->obd_attached) {
604                                 return i;
605                         }
606                         break;
607                 }
608         }
609
610         return -1;
611 }
612
613 int class_name2dev(const char *name)
614 {
615         int i;
616
617         if (!name)
618                 return -1;
619
620         read_lock(&obd_dev_lock);
621         i = class_name2dev_nolock(name);
622         read_unlock(&obd_dev_lock);
623
624         return i;
625 }
626 EXPORT_SYMBOL(class_name2dev);
627
628 struct obd_device *class_name2obd(const char *name)
629 {
630         int dev = class_name2dev(name);
631
632         if (dev < 0 || dev > class_devno_max())
633                 return NULL;
634         return class_num2obd(dev);
635 }
636 EXPORT_SYMBOL(class_name2obd);
637
638 int class_uuid2dev_nolock(struct obd_uuid *uuid)
639 {
640         int i;
641
642         for (i = 0; i < class_devno_max(); i++) {
643                 struct obd_device *obd = class_num2obd(i);
644
645                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
646                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
647                         return i;
648                 }
649         }
650
651         return -1;
652 }
653
654 int class_uuid2dev(struct obd_uuid *uuid)
655 {
656         int i;
657
658         read_lock(&obd_dev_lock);
659         i = class_uuid2dev_nolock(uuid);
660         read_unlock(&obd_dev_lock);
661
662         return i;
663 }
664 EXPORT_SYMBOL(class_uuid2dev);
665
666 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
667 {
668         int dev = class_uuid2dev(uuid);
669         if (dev < 0)
670                 return NULL;
671         return class_num2obd(dev);
672 }
673 EXPORT_SYMBOL(class_uuid2obd);
674
675 /**
676  * Get obd device from ::obd_devs[]
677  *
678  * \param num [in] array index
679  *
680  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
681  *         otherwise return the obd device there.
682  */
683 struct obd_device *class_num2obd(int num)
684 {
685         struct obd_device *obd = NULL;
686
687         if (num < class_devno_max()) {
688                 obd = obd_devs[num];
689                 if (obd == NULL)
690                         return NULL;
691
692                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
693                          "%p obd_magic %08x != %08x\n",
694                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
695                 LASSERTF(obd->obd_minor == num,
696                          "%p obd_minor %0d != %0d\n",
697                          obd, obd->obd_minor, num);
698         }
699
700         return obd;
701 }
702
703 /**
704  * Find obd in obd_dev[] by name or uuid.
705  *
706  * Increment obd's refcount if found.
707  *
708  * \param[in] str obd name or uuid
709  *
710  * \retval NULL    if not found
711  * \retval target  pointer to found obd_device
712  */
713 struct obd_device *class_dev_by_str(const char *str)
714 {
715         struct obd_device *target = NULL;
716         struct obd_uuid tgtuuid;
717         int rc;
718
719         obd_str2uuid(&tgtuuid, str);
720
721         read_lock(&obd_dev_lock);
722         rc = class_uuid2dev_nolock(&tgtuuid);
723         if (rc < 0)
724                 rc = class_name2dev_nolock(str);
725
726         if (rc >= 0)
727                 target = class_num2obd(rc);
728
729         if (target != NULL)
730                 class_incref(target, "find", current);
731         read_unlock(&obd_dev_lock);
732
733         RETURN(target);
734 }
735 EXPORT_SYMBOL(class_dev_by_str);
736
737 /**
738  * Get obd devices count. Device in any
739  *    state are counted
740  * \retval obd device count
741  */
742 int get_devices_count(void)
743 {
744         int index, max_index = class_devno_max(), dev_count = 0;
745
746         read_lock(&obd_dev_lock);
747         for (index = 0; index <= max_index; index++) {
748                 struct obd_device *obd = class_num2obd(index);
749                 if (obd != NULL)
750                         dev_count++;
751         }
752         read_unlock(&obd_dev_lock);
753
754         return dev_count;
755 }
756 EXPORT_SYMBOL(get_devices_count);
757
758 void class_obd_list(void)
759 {
760         char *status;
761         int i;
762
763         read_lock(&obd_dev_lock);
764         for (i = 0; i < class_devno_max(); i++) {
765                 struct obd_device *obd = class_num2obd(i);
766
767                 if (obd == NULL)
768                         continue;
769                 if (obd->obd_stopping)
770                         status = "ST";
771                 else if (obd->obd_set_up)
772                         status = "UP";
773                 else if (obd->obd_attached)
774                         status = "AT";
775                 else
776                         status = "--";
777                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
778                          i, status, obd->obd_type->typ_name,
779                          obd->obd_name, obd->obd_uuid.uuid,
780                          atomic_read(&obd->obd_refcount));
781         }
782         read_unlock(&obd_dev_lock);
783         return;
784 }
785
786 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
787    specified, then only the client with that uuid is returned,
788    otherwise any client connected to the tgt is returned. */
789 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
790                                           const char * typ_name,
791                                           struct obd_uuid *grp_uuid)
792 {
793         int i;
794
795         read_lock(&obd_dev_lock);
796         for (i = 0; i < class_devno_max(); i++) {
797                 struct obd_device *obd = class_num2obd(i);
798
799                 if (obd == NULL)
800                         continue;
801                 if ((strncmp(obd->obd_type->typ_name, typ_name,
802                              strlen(typ_name)) == 0)) {
803                         if (obd_uuid_equals(tgt_uuid,
804                                             &obd->u.cli.cl_target_uuid) &&
805                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
806                                                          &obd->obd_uuid) : 1)) {
807                                 read_unlock(&obd_dev_lock);
808                                 return obd;
809                         }
810                 }
811         }
812         read_unlock(&obd_dev_lock);
813
814         return NULL;
815 }
816 EXPORT_SYMBOL(class_find_client_obd);
817
818 /* Iterate the obd_device list looking devices have grp_uuid. Start
819    searching at *next, and if a device is found, the next index to look
820    at is saved in *next. If next is NULL, then the first matching device
821    will always be returned. */
822 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
823 {
824         int i;
825
826         if (next == NULL)
827                 i = 0;
828         else if (*next >= 0 && *next < class_devno_max())
829                 i = *next;
830         else
831                 return NULL;
832
833         read_lock(&obd_dev_lock);
834         for (; i < class_devno_max(); i++) {
835                 struct obd_device *obd = class_num2obd(i);
836
837                 if (obd == NULL)
838                         continue;
839                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
840                         if (next != NULL)
841                                 *next = i+1;
842                         read_unlock(&obd_dev_lock);
843                         return obd;
844                 }
845         }
846         read_unlock(&obd_dev_lock);
847
848         return NULL;
849 }
850 EXPORT_SYMBOL(class_devices_in_group);
851
852 /**
853  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
854  * adjust sptlrpc settings accordingly.
855  */
856 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
857 {
858         struct obd_device  *obd;
859         const char         *type;
860         int                 i, rc = 0, rc2;
861
862         LASSERT(namelen > 0);
863
864         read_lock(&obd_dev_lock);
865         for (i = 0; i < class_devno_max(); i++) {
866                 obd = class_num2obd(i);
867
868                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
869                         continue;
870
871                 /* only notify mdc, osc, osp, lwp, mdt, ost
872                  * because only these have a -sptlrpc llog */
873                 type = obd->obd_type->typ_name;
874                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
875                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
876                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
877                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
878                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
879                     strcmp(type, LUSTRE_OST_NAME) != 0)
880                         continue;
881
882                 if (strncmp(obd->obd_name, fsname, namelen))
883                         continue;
884
885                 class_incref(obd, __FUNCTION__, obd);
886                 read_unlock(&obd_dev_lock);
887                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
888                                          sizeof(KEY_SPTLRPC_CONF),
889                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
890                 rc = rc ? rc : rc2;
891                 class_decref(obd, __FUNCTION__, obd);
892                 read_lock(&obd_dev_lock);
893         }
894         read_unlock(&obd_dev_lock);
895         return rc;
896 }
897 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
898
899 void obd_cleanup_caches(void)
900 {
901         ENTRY;
902         if (obd_device_cachep) {
903                 kmem_cache_destroy(obd_device_cachep);
904                 obd_device_cachep = NULL;
905         }
906         if (obdo_cachep) {
907                 kmem_cache_destroy(obdo_cachep);
908                 obdo_cachep = NULL;
909         }
910         if (import_cachep) {
911                 kmem_cache_destroy(import_cachep);
912                 import_cachep = NULL;
913         }
914
915         EXIT;
916 }
917
918 int obd_init_caches(void)
919 {
920         int rc;
921         ENTRY;
922
923         LASSERT(obd_device_cachep == NULL);
924         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
925                                               sizeof(struct obd_device),
926                                               0, 0, NULL);
927         if (!obd_device_cachep)
928                 GOTO(out, rc = -ENOMEM);
929
930         LASSERT(obdo_cachep == NULL);
931         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
932                                         0, 0, NULL);
933         if (!obdo_cachep)
934                 GOTO(out, rc = -ENOMEM);
935
936         LASSERT(import_cachep == NULL);
937         import_cachep = kmem_cache_create("ll_import_cache",
938                                           sizeof(struct obd_import),
939                                           0, 0, NULL);
940         if (!import_cachep)
941                 GOTO(out, rc = -ENOMEM);
942
943         RETURN(0);
944 out:
945         obd_cleanup_caches();
946         RETURN(rc);
947 }
948
949 /* map connection to client */
950 struct obd_export *class_conn2export(struct lustre_handle *conn)
951 {
952         struct obd_export *export;
953         ENTRY;
954
955         if (!conn) {
956                 CDEBUG(D_CACHE, "looking for null handle\n");
957                 RETURN(NULL);
958         }
959
960         if (conn->cookie == -1) {  /* this means assign a new connection */
961                 CDEBUG(D_CACHE, "want a new connection\n");
962                 RETURN(NULL);
963         }
964
965         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
966         export = class_handle2object(conn->cookie, NULL);
967         RETURN(export);
968 }
969 EXPORT_SYMBOL(class_conn2export);
970
971 struct obd_device *class_exp2obd(struct obd_export *exp)
972 {
973         if (exp)
974                 return exp->exp_obd;
975         return NULL;
976 }
977 EXPORT_SYMBOL(class_exp2obd);
978
979 struct obd_device *class_conn2obd(struct lustre_handle *conn)
980 {
981         struct obd_export *export;
982         export = class_conn2export(conn);
983         if (export) {
984                 struct obd_device *obd = export->exp_obd;
985                 class_export_put(export);
986                 return obd;
987         }
988         return NULL;
989 }
990
991 struct obd_import *class_exp2cliimp(struct obd_export *exp)
992 {
993         struct obd_device *obd = exp->exp_obd;
994         if (obd == NULL)
995                 return NULL;
996         return obd->u.cli.cl_import;
997 }
998 EXPORT_SYMBOL(class_exp2cliimp);
999
1000 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
1001 {
1002         struct obd_device *obd = class_conn2obd(conn);
1003         if (obd == NULL)
1004                 return NULL;
1005         return obd->u.cli.cl_import;
1006 }
1007
1008 /* Export management functions */
1009 static void class_export_destroy(struct obd_export *exp)
1010 {
1011         struct obd_device *obd = exp->exp_obd;
1012         ENTRY;
1013
1014         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
1015         LASSERT(obd != NULL);
1016
1017         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
1018                exp->exp_client_uuid.uuid, obd->obd_name);
1019
1020         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
1021         if (exp->exp_connection)
1022                 ptlrpc_put_connection_superhack(exp->exp_connection);
1023
1024         LASSERT(list_empty(&exp->exp_outstanding_replies));
1025         LASSERT(list_empty(&exp->exp_uncommitted_replies));
1026         LASSERT(list_empty(&exp->exp_req_replay_queue));
1027         LASSERT(list_empty(&exp->exp_hp_rpcs));
1028         obd_destroy_export(exp);
1029         /* self export doesn't hold a reference to an obd, although it
1030          * exists until freeing of the obd */
1031         if (exp != obd->obd_self_export)
1032                 class_decref(obd, "export", exp);
1033
1034         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1035         EXIT;
1036 }
1037
1038 static void export_handle_addref(void *export)
1039 {
1040         class_export_get(export);
1041 }
1042
1043 static struct portals_handle_ops export_handle_ops = {
1044         .hop_addref = export_handle_addref,
1045         .hop_free   = NULL,
1046 };
1047
1048 struct obd_export *class_export_get(struct obd_export *exp)
1049 {
1050         atomic_inc(&exp->exp_refcount);
1051         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1052                atomic_read(&exp->exp_refcount));
1053         return exp;
1054 }
1055 EXPORT_SYMBOL(class_export_get);
1056
1057 void class_export_put(struct obd_export *exp)
1058 {
1059         LASSERT(exp != NULL);
1060         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1061         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1062                atomic_read(&exp->exp_refcount) - 1);
1063
1064         if (atomic_dec_and_test(&exp->exp_refcount)) {
1065                 struct obd_device *obd = exp->exp_obd;
1066
1067                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1068                        exp, exp->exp_client_uuid.uuid);
1069
1070                 /* release nid stat refererence */
1071                 lprocfs_exp_cleanup(exp);
1072
1073                 if (exp == obd->obd_self_export) {
1074                         /* self export should be destroyed without
1075                          * zombie thread as it doesn't hold a
1076                          * reference to obd and doesn't hold any
1077                          * resources */
1078                         class_export_destroy(exp);
1079                         /* self export is destroyed, no class
1080                          * references exist and it is safe to free
1081                          * obd */
1082                         class_free_dev(obd);
1083                 } else {
1084                         LASSERT(!list_empty(&exp->exp_obd_chain));
1085                         obd_zombie_export_add(exp);
1086                 }
1087
1088         }
1089 }
1090 EXPORT_SYMBOL(class_export_put);
1091
1092 static void obd_zombie_exp_cull(struct work_struct *ws)
1093 {
1094         struct obd_export *export;
1095
1096         export = container_of(ws, struct obd_export, exp_zombie_work);
1097         class_export_destroy(export);
1098 }
1099
1100 /* Creates a new export, adds it to the hash table, and returns a
1101  * pointer to it. The refcount is 2: one for the hash reference, and
1102  * one for the pointer returned by this function. */
1103 struct obd_export *__class_new_export(struct obd_device *obd,
1104                                       struct obd_uuid *cluuid, bool is_self)
1105 {
1106         struct obd_export *export;
1107         struct cfs_hash *hash = NULL;
1108         int rc = 0;
1109         ENTRY;
1110
1111         OBD_ALLOC_PTR(export);
1112         if (!export)
1113                 return ERR_PTR(-ENOMEM);
1114
1115         export->exp_conn_cnt = 0;
1116         export->exp_lock_hash = NULL;
1117         export->exp_flock_hash = NULL;
1118         /* 2 = class_handle_hash + last */
1119         atomic_set(&export->exp_refcount, 2);
1120         atomic_set(&export->exp_rpc_count, 0);
1121         atomic_set(&export->exp_cb_count, 0);
1122         atomic_set(&export->exp_locks_count, 0);
1123 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1124         INIT_LIST_HEAD(&export->exp_locks_list);
1125         spin_lock_init(&export->exp_locks_list_guard);
1126 #endif
1127         atomic_set(&export->exp_replay_count, 0);
1128         export->exp_obd = obd;
1129         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1130         spin_lock_init(&export->exp_uncommitted_replies_lock);
1131         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1132         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1133         INIT_LIST_HEAD(&export->exp_handle.h_link);
1134         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1135         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1136         class_handle_hash(&export->exp_handle, &export_handle_ops);
1137         export->exp_last_request_time = ktime_get_real_seconds();
1138         spin_lock_init(&export->exp_lock);
1139         spin_lock_init(&export->exp_rpc_lock);
1140         INIT_HLIST_NODE(&export->exp_uuid_hash);
1141         INIT_HLIST_NODE(&export->exp_nid_hash);
1142         INIT_HLIST_NODE(&export->exp_gen_hash);
1143         spin_lock_init(&export->exp_bl_list_lock);
1144         INIT_LIST_HEAD(&export->exp_bl_list);
1145         INIT_LIST_HEAD(&export->exp_stale_list);
1146         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1147
1148         export->exp_sp_peer = LUSTRE_SP_ANY;
1149         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1150         export->exp_client_uuid = *cluuid;
1151         obd_init_export(export);
1152
1153         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1154                 spin_lock(&obd->obd_dev_lock);
1155                 /* shouldn't happen, but might race */
1156                 if (obd->obd_stopping)
1157                         GOTO(exit_unlock, rc = -ENODEV);
1158
1159                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1160                 if (hash == NULL)
1161                         GOTO(exit_unlock, rc = -ENODEV);
1162                 spin_unlock(&obd->obd_dev_lock);
1163
1164                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1165                 if (rc != 0) {
1166                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1167                                       obd->obd_name, cluuid->uuid, rc);
1168                         GOTO(exit_err, rc = -EALREADY);
1169                 }
1170         }
1171
1172         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1173         spin_lock(&obd->obd_dev_lock);
1174         if (obd->obd_stopping) {
1175                 if (hash)
1176                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1177                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1178         }
1179
1180         if (!is_self) {
1181                 class_incref(obd, "export", export);
1182                 list_add_tail(&export->exp_obd_chain_timed,
1183                               &obd->obd_exports_timed);
1184                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1185                 obd->obd_num_exports++;
1186         } else {
1187                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1188                 INIT_LIST_HEAD(&export->exp_obd_chain);
1189         }
1190         spin_unlock(&obd->obd_dev_lock);
1191         if (hash)
1192                 cfs_hash_putref(hash);
1193         RETURN(export);
1194
1195 exit_unlock:
1196         spin_unlock(&obd->obd_dev_lock);
1197 exit_err:
1198         if (hash)
1199                 cfs_hash_putref(hash);
1200         class_handle_unhash(&export->exp_handle);
1201         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1202         obd_destroy_export(export);
1203         OBD_FREE_PTR(export);
1204         return ERR_PTR(rc);
1205 }
1206
1207 struct obd_export *class_new_export(struct obd_device *obd,
1208                                     struct obd_uuid *uuid)
1209 {
1210         return __class_new_export(obd, uuid, false);
1211 }
1212 EXPORT_SYMBOL(class_new_export);
1213
1214 struct obd_export *class_new_export_self(struct obd_device *obd,
1215                                          struct obd_uuid *uuid)
1216 {
1217         return __class_new_export(obd, uuid, true);
1218 }
1219
1220 void class_unlink_export(struct obd_export *exp)
1221 {
1222         class_handle_unhash(&exp->exp_handle);
1223
1224         if (exp->exp_obd->obd_self_export == exp) {
1225                 class_export_put(exp);
1226                 return;
1227         }
1228
1229         spin_lock(&exp->exp_obd->obd_dev_lock);
1230         /* delete an uuid-export hashitem from hashtables */
1231         if (!hlist_unhashed(&exp->exp_uuid_hash))
1232                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1233                              &exp->exp_client_uuid,
1234                              &exp->exp_uuid_hash);
1235
1236 #ifdef HAVE_SERVER_SUPPORT
1237         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1238                 struct tg_export_data   *ted = &exp->exp_target_data;
1239                 struct cfs_hash         *hash;
1240
1241                 /* Because obd_gen_hash will not be released until
1242                  * class_cleanup(), so hash should never be NULL here */
1243                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1244                 LASSERT(hash != NULL);
1245                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1246                              &exp->exp_gen_hash);
1247                 cfs_hash_putref(hash);
1248         }
1249 #endif /* HAVE_SERVER_SUPPORT */
1250
1251         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1252         list_del_init(&exp->exp_obd_chain_timed);
1253         exp->exp_obd->obd_num_exports--;
1254         spin_unlock(&exp->exp_obd->obd_dev_lock);
1255         atomic_inc(&obd_stale_export_num);
1256
1257         /* A reference is kept by obd_stale_exports list */
1258         obd_stale_export_put(exp);
1259 }
1260 EXPORT_SYMBOL(class_unlink_export);
1261
1262 /* Import management functions */
1263 static void class_import_destroy(struct obd_import *imp)
1264 {
1265         ENTRY;
1266
1267         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1268                 imp->imp_obd->obd_name);
1269
1270         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1271
1272         ptlrpc_put_connection_superhack(imp->imp_connection);
1273
1274         while (!list_empty(&imp->imp_conn_list)) {
1275                 struct obd_import_conn *imp_conn;
1276
1277                 imp_conn = list_entry(imp->imp_conn_list.next,
1278                                       struct obd_import_conn, oic_item);
1279                 list_del_init(&imp_conn->oic_item);
1280                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1281                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1282         }
1283
1284         LASSERT(imp->imp_sec == NULL);
1285         class_decref(imp->imp_obd, "import", imp);
1286         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1287         EXIT;
1288 }
1289
1290 static void import_handle_addref(void *import)
1291 {
1292         class_import_get(import);
1293 }
1294
1295 static struct portals_handle_ops import_handle_ops = {
1296         .hop_addref = import_handle_addref,
1297         .hop_free   = NULL,
1298 };
1299
1300 struct obd_import *class_import_get(struct obd_import *import)
1301 {
1302         atomic_inc(&import->imp_refcount);
1303         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1304                atomic_read(&import->imp_refcount),
1305                import->imp_obd->obd_name);
1306         return import;
1307 }
1308 EXPORT_SYMBOL(class_import_get);
1309
1310 void class_import_put(struct obd_import *imp)
1311 {
1312         ENTRY;
1313
1314         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1315
1316         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1317                atomic_read(&imp->imp_refcount) - 1,
1318                imp->imp_obd->obd_name);
1319
1320         if (atomic_dec_and_test(&imp->imp_refcount)) {
1321                 CDEBUG(D_INFO, "final put import %p\n", imp);
1322                 obd_zombie_import_add(imp);
1323         }
1324
1325         /* catch possible import put race */
1326         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1327         EXIT;
1328 }
1329 EXPORT_SYMBOL(class_import_put);
1330
1331 static void init_imp_at(struct imp_at *at) {
1332         int i;
1333         at_init(&at->iat_net_latency, 0, 0);
1334         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1335                 /* max service estimates are tracked on the server side, so
1336                    don't use the AT history here, just use the last reported
1337                    val. (But keep hist for proc histogram, worst_ever) */
1338                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1339                         AT_FLG_NOHIST);
1340         }
1341 }
1342
1343 static void obd_zombie_imp_cull(struct work_struct *ws)
1344 {
1345         struct obd_import *import;
1346
1347         import = container_of(ws, struct obd_import, imp_zombie_work);
1348         class_import_destroy(import);
1349 }
1350
1351 struct obd_import *class_new_import(struct obd_device *obd)
1352 {
1353         struct obd_import *imp;
1354         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1355
1356         OBD_ALLOC(imp, sizeof(*imp));
1357         if (imp == NULL)
1358                 return NULL;
1359
1360         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1361         INIT_LIST_HEAD(&imp->imp_replay_list);
1362         INIT_LIST_HEAD(&imp->imp_sending_list);
1363         INIT_LIST_HEAD(&imp->imp_delayed_list);
1364         INIT_LIST_HEAD(&imp->imp_committed_list);
1365         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1366         imp->imp_known_replied_xid = 0;
1367         imp->imp_replay_cursor = &imp->imp_committed_list;
1368         spin_lock_init(&imp->imp_lock);
1369         imp->imp_last_success_conn = 0;
1370         imp->imp_state = LUSTRE_IMP_NEW;
1371         imp->imp_obd = class_incref(obd, "import", imp);
1372         mutex_init(&imp->imp_sec_mutex);
1373         init_waitqueue_head(&imp->imp_recovery_waitq);
1374         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1375
1376         if (curr_pid_ns->child_reaper)
1377                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1378         else
1379                 imp->imp_sec_refpid = 1;
1380
1381         atomic_set(&imp->imp_refcount, 2);
1382         atomic_set(&imp->imp_unregistering, 0);
1383         atomic_set(&imp->imp_inflight, 0);
1384         atomic_set(&imp->imp_replay_inflight, 0);
1385         atomic_set(&imp->imp_inval_count, 0);
1386         INIT_LIST_HEAD(&imp->imp_conn_list);
1387         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1388         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1389         init_imp_at(&imp->imp_at);
1390
1391         /* the default magic is V2, will be used in connect RPC, and
1392          * then adjusted according to the flags in request/reply. */
1393         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1394
1395         return imp;
1396 }
1397 EXPORT_SYMBOL(class_new_import);
1398
1399 void class_destroy_import(struct obd_import *import)
1400 {
1401         LASSERT(import != NULL);
1402         LASSERT(import != LP_POISON);
1403
1404         class_handle_unhash(&import->imp_handle);
1405
1406         spin_lock(&import->imp_lock);
1407         import->imp_generation++;
1408         spin_unlock(&import->imp_lock);
1409         class_import_put(import);
1410 }
1411 EXPORT_SYMBOL(class_destroy_import);
1412
1413 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1414
1415 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1416 {
1417         spin_lock(&exp->exp_locks_list_guard);
1418
1419         LASSERT(lock->l_exp_refs_nr >= 0);
1420
1421         if (lock->l_exp_refs_target != NULL &&
1422             lock->l_exp_refs_target != exp) {
1423                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1424                               exp, lock, lock->l_exp_refs_target);
1425         }
1426         if ((lock->l_exp_refs_nr ++) == 0) {
1427                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1428                 lock->l_exp_refs_target = exp;
1429         }
1430         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1431                lock, exp, lock->l_exp_refs_nr);
1432         spin_unlock(&exp->exp_locks_list_guard);
1433 }
1434 EXPORT_SYMBOL(__class_export_add_lock_ref);
1435
1436 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1437 {
1438         spin_lock(&exp->exp_locks_list_guard);
1439         LASSERT(lock->l_exp_refs_nr > 0);
1440         if (lock->l_exp_refs_target != exp) {
1441                 LCONSOLE_WARN("lock %p, "
1442                               "mismatching export pointers: %p, %p\n",
1443                               lock, lock->l_exp_refs_target, exp);
1444         }
1445         if (-- lock->l_exp_refs_nr == 0) {
1446                 list_del_init(&lock->l_exp_refs_link);
1447                 lock->l_exp_refs_target = NULL;
1448         }
1449         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1450                lock, exp, lock->l_exp_refs_nr);
1451         spin_unlock(&exp->exp_locks_list_guard);
1452 }
1453 EXPORT_SYMBOL(__class_export_del_lock_ref);
1454 #endif
1455
1456 /* A connection defines an export context in which preallocation can
1457    be managed. This releases the export pointer reference, and returns
1458    the export handle, so the export refcount is 1 when this function
1459    returns. */
1460 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1461                   struct obd_uuid *cluuid)
1462 {
1463         struct obd_export *export;
1464         LASSERT(conn != NULL);
1465         LASSERT(obd != NULL);
1466         LASSERT(cluuid != NULL);
1467         ENTRY;
1468
1469         export = class_new_export(obd, cluuid);
1470         if (IS_ERR(export))
1471                 RETURN(PTR_ERR(export));
1472
1473         conn->cookie = export->exp_handle.h_cookie;
1474         class_export_put(export);
1475
1476         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1477                cluuid->uuid, conn->cookie);
1478         RETURN(0);
1479 }
1480 EXPORT_SYMBOL(class_connect);
1481
1482 /* if export is involved in recovery then clean up related things */
1483 static void class_export_recovery_cleanup(struct obd_export *exp)
1484 {
1485         struct obd_device *obd = exp->exp_obd;
1486
1487         spin_lock(&obd->obd_recovery_task_lock);
1488         if (obd->obd_recovering) {
1489                 if (exp->exp_in_recovery) {
1490                         spin_lock(&exp->exp_lock);
1491                         exp->exp_in_recovery = 0;
1492                         spin_unlock(&exp->exp_lock);
1493                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1494                         atomic_dec(&obd->obd_connected_clients);
1495                 }
1496
1497                 /* if called during recovery then should update
1498                  * obd_stale_clients counter,
1499                  * lightweight exports are not counted */
1500                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1501                         exp->exp_obd->obd_stale_clients++;
1502         }
1503         spin_unlock(&obd->obd_recovery_task_lock);
1504
1505         spin_lock(&exp->exp_lock);
1506         /** Cleanup req replay fields */
1507         if (exp->exp_req_replay_needed) {
1508                 exp->exp_req_replay_needed = 0;
1509
1510                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1511                 atomic_dec(&obd->obd_req_replay_clients);
1512         }
1513
1514         /** Cleanup lock replay data */
1515         if (exp->exp_lock_replay_needed) {
1516                 exp->exp_lock_replay_needed = 0;
1517
1518                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1519                 atomic_dec(&obd->obd_lock_replay_clients);
1520         }
1521         spin_unlock(&exp->exp_lock);
1522 }
1523
1524 /* This function removes 1-3 references from the export:
1525  * 1 - for export pointer passed
1526  * and if disconnect really need
1527  * 2 - removing from hash
1528  * 3 - in client_unlink_export
1529  * The export pointer passed to this function can destroyed */
1530 int class_disconnect(struct obd_export *export)
1531 {
1532         int already_disconnected;
1533         ENTRY;
1534
1535         if (export == NULL) {
1536                 CWARN("attempting to free NULL export %p\n", export);
1537                 RETURN(-EINVAL);
1538         }
1539
1540         spin_lock(&export->exp_lock);
1541         already_disconnected = export->exp_disconnected;
1542         export->exp_disconnected = 1;
1543         /*  We hold references of export for uuid hash
1544          *  and nid_hash and export link at least. So
1545          *  it is safe to call cfs_hash_del in there.  */
1546         if (!hlist_unhashed(&export->exp_nid_hash))
1547                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1548                              &export->exp_connection->c_peer.nid,
1549                              &export->exp_nid_hash);
1550         spin_unlock(&export->exp_lock);
1551
1552         /* class_cleanup(), abort_recovery(), and class_fail_export()
1553          * all end up in here, and if any of them race we shouldn't
1554          * call extra class_export_puts(). */
1555         if (already_disconnected) {
1556                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1557                 GOTO(no_disconn, already_disconnected);
1558         }
1559
1560         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1561                export->exp_handle.h_cookie);
1562
1563         class_export_recovery_cleanup(export);
1564         class_unlink_export(export);
1565 no_disconn:
1566         class_export_put(export);
1567         RETURN(0);
1568 }
1569 EXPORT_SYMBOL(class_disconnect);
1570
1571 /* Return non-zero for a fully connected export */
1572 int class_connected_export(struct obd_export *exp)
1573 {
1574         int connected = 0;
1575
1576         if (exp) {
1577                 spin_lock(&exp->exp_lock);
1578                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1579                 spin_unlock(&exp->exp_lock);
1580         }
1581         return connected;
1582 }
1583 EXPORT_SYMBOL(class_connected_export);
1584
1585 static void class_disconnect_export_list(struct list_head *list,
1586                                          enum obd_option flags)
1587 {
1588         int rc;
1589         struct obd_export *exp;
1590         ENTRY;
1591
1592         /* It's possible that an export may disconnect itself, but
1593          * nothing else will be added to this list. */
1594         while (!list_empty(list)) {
1595                 exp = list_entry(list->next, struct obd_export,
1596                                  exp_obd_chain);
1597                 /* need for safe call CDEBUG after obd_disconnect */
1598                 class_export_get(exp);
1599
1600                 spin_lock(&exp->exp_lock);
1601                 exp->exp_flags = flags;
1602                 spin_unlock(&exp->exp_lock);
1603
1604                 if (obd_uuid_equals(&exp->exp_client_uuid,
1605                                     &exp->exp_obd->obd_uuid)) {
1606                         CDEBUG(D_HA,
1607                                "exp %p export uuid == obd uuid, don't discon\n",
1608                                exp);
1609                         /* Need to delete this now so we don't end up pointing
1610                          * to work_list later when this export is cleaned up. */
1611                         list_del_init(&exp->exp_obd_chain);
1612                         class_export_put(exp);
1613                         continue;
1614                 }
1615
1616                 class_export_get(exp);
1617                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1618                        "last request at %lld\n",
1619                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1620                        exp, exp->exp_last_request_time);
1621                 /* release one export reference anyway */
1622                 rc = obd_disconnect(exp);
1623
1624                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1625                        obd_export_nid2str(exp), exp, rc);
1626                 class_export_put(exp);
1627         }
1628         EXIT;
1629 }
1630
1631 void class_disconnect_exports(struct obd_device *obd)
1632 {
1633         struct list_head work_list;
1634         ENTRY;
1635
1636         /* Move all of the exports from obd_exports to a work list, en masse. */
1637         INIT_LIST_HEAD(&work_list);
1638         spin_lock(&obd->obd_dev_lock);
1639         list_splice_init(&obd->obd_exports, &work_list);
1640         list_splice_init(&obd->obd_delayed_exports, &work_list);
1641         spin_unlock(&obd->obd_dev_lock);
1642
1643         if (!list_empty(&work_list)) {
1644                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1645                        "disconnecting them\n", obd->obd_minor, obd);
1646                 class_disconnect_export_list(&work_list,
1647                                              exp_flags_from_obd(obd));
1648         } else
1649                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1650                        obd->obd_minor, obd);
1651         EXIT;
1652 }
1653 EXPORT_SYMBOL(class_disconnect_exports);
1654
1655 /* Remove exports that have not completed recovery.
1656  */
1657 void class_disconnect_stale_exports(struct obd_device *obd,
1658                                     int (*test_export)(struct obd_export *))
1659 {
1660         struct list_head work_list;
1661         struct obd_export *exp, *n;
1662         int evicted = 0;
1663         ENTRY;
1664
1665         INIT_LIST_HEAD(&work_list);
1666         spin_lock(&obd->obd_dev_lock);
1667         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1668                                  exp_obd_chain) {
1669                 /* don't count self-export as client */
1670                 if (obd_uuid_equals(&exp->exp_client_uuid,
1671                                     &exp->exp_obd->obd_uuid))
1672                         continue;
1673
1674                 /* don't evict clients which have no slot in last_rcvd
1675                  * (e.g. lightweight connection) */
1676                 if (exp->exp_target_data.ted_lr_idx == -1)
1677                         continue;
1678
1679                 spin_lock(&exp->exp_lock);
1680                 if (exp->exp_failed || test_export(exp)) {
1681                         spin_unlock(&exp->exp_lock);
1682                         continue;
1683                 }
1684                 exp->exp_failed = 1;
1685                 spin_unlock(&exp->exp_lock);
1686
1687                 list_move(&exp->exp_obd_chain, &work_list);
1688                 evicted++;
1689                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1690                        obd->obd_name, exp->exp_client_uuid.uuid,
1691                        obd_export_nid2str(exp));
1692                 print_export_data(exp, "EVICTING", 0, D_HA);
1693         }
1694         spin_unlock(&obd->obd_dev_lock);
1695
1696         if (evicted)
1697                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1698                               obd->obd_name, evicted);
1699
1700         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1701                                                  OBD_OPT_ABORT_RECOV);
1702         EXIT;
1703 }
1704 EXPORT_SYMBOL(class_disconnect_stale_exports);
1705
1706 void class_fail_export(struct obd_export *exp)
1707 {
1708         int rc, already_failed;
1709
1710         spin_lock(&exp->exp_lock);
1711         already_failed = exp->exp_failed;
1712         exp->exp_failed = 1;
1713         spin_unlock(&exp->exp_lock);
1714
1715         if (already_failed) {
1716                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1717                        exp, exp->exp_client_uuid.uuid);
1718                 return;
1719         }
1720
1721         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1722                exp, exp->exp_client_uuid.uuid);
1723
1724         if (obd_dump_on_timeout)
1725                 libcfs_debug_dumplog();
1726
1727         /* need for safe call CDEBUG after obd_disconnect */
1728         class_export_get(exp);
1729
1730         /* Most callers into obd_disconnect are removing their own reference
1731          * (request, for example) in addition to the one from the hash table.
1732          * We don't have such a reference here, so make one. */
1733         class_export_get(exp);
1734         rc = obd_disconnect(exp);
1735         if (rc)
1736                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1737         else
1738                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1739                        exp, exp->exp_client_uuid.uuid);
1740         class_export_put(exp);
1741 }
1742 EXPORT_SYMBOL(class_fail_export);
1743
1744 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1745 {
1746         struct cfs_hash *nid_hash;
1747         struct obd_export *doomed_exp = NULL;
1748         int exports_evicted = 0;
1749
1750         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1751
1752         spin_lock(&obd->obd_dev_lock);
1753         /* umount has run already, so evict thread should leave
1754          * its task to umount thread now */
1755         if (obd->obd_stopping) {
1756                 spin_unlock(&obd->obd_dev_lock);
1757                 return exports_evicted;
1758         }
1759         nid_hash = obd->obd_nid_hash;
1760         cfs_hash_getref(nid_hash);
1761         spin_unlock(&obd->obd_dev_lock);
1762
1763         do {
1764                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1765                 if (doomed_exp == NULL)
1766                         break;
1767
1768                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1769                          "nid %s found, wanted nid %s, requested nid %s\n",
1770                          obd_export_nid2str(doomed_exp),
1771                          libcfs_nid2str(nid_key), nid);
1772                 LASSERTF(doomed_exp != obd->obd_self_export,
1773                          "self-export is hashed by NID?\n");
1774                 exports_evicted++;
1775                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1776                               "request\n", obd->obd_name,
1777                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1778                               obd_export_nid2str(doomed_exp));
1779                 class_fail_export(doomed_exp);
1780                 class_export_put(doomed_exp);
1781         } while (1);
1782
1783         cfs_hash_putref(nid_hash);
1784
1785         if (!exports_evicted)
1786                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1787                        obd->obd_name, nid);
1788         return exports_evicted;
1789 }
1790 EXPORT_SYMBOL(obd_export_evict_by_nid);
1791
1792 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1793 {
1794         struct cfs_hash *uuid_hash;
1795         struct obd_export *doomed_exp = NULL;
1796         struct obd_uuid doomed_uuid;
1797         int exports_evicted = 0;
1798
1799         spin_lock(&obd->obd_dev_lock);
1800         if (obd->obd_stopping) {
1801                 spin_unlock(&obd->obd_dev_lock);
1802                 return exports_evicted;
1803         }
1804         uuid_hash = obd->obd_uuid_hash;
1805         cfs_hash_getref(uuid_hash);
1806         spin_unlock(&obd->obd_dev_lock);
1807
1808         obd_str2uuid(&doomed_uuid, uuid);
1809         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1810                 CERROR("%s: can't evict myself\n", obd->obd_name);
1811                 cfs_hash_putref(uuid_hash);
1812                 return exports_evicted;
1813         }
1814
1815         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1816
1817         if (doomed_exp == NULL) {
1818                 CERROR("%s: can't disconnect %s: no exports found\n",
1819                        obd->obd_name, uuid);
1820         } else {
1821                 CWARN("%s: evicting %s at adminstrative request\n",
1822                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1823                 class_fail_export(doomed_exp);
1824                 class_export_put(doomed_exp);
1825                 exports_evicted++;
1826         }
1827         cfs_hash_putref(uuid_hash);
1828
1829         return exports_evicted;
1830 }
1831
1832 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1833 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1834 EXPORT_SYMBOL(class_export_dump_hook);
1835 #endif
1836
1837 static void print_export_data(struct obd_export *exp, const char *status,
1838                               int locks, int debug_level)
1839 {
1840         struct ptlrpc_reply_state *rs;
1841         struct ptlrpc_reply_state *first_reply = NULL;
1842         int nreplies = 0;
1843
1844         spin_lock(&exp->exp_lock);
1845         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1846                             rs_exp_list) {
1847                 if (nreplies == 0)
1848                         first_reply = rs;
1849                 nreplies++;
1850         }
1851         spin_unlock(&exp->exp_lock);
1852
1853         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1854                "%p %s %llu stale:%d\n",
1855                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1856                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1857                atomic_read(&exp->exp_rpc_count),
1858                atomic_read(&exp->exp_cb_count),
1859                atomic_read(&exp->exp_locks_count),
1860                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1861                nreplies, first_reply, nreplies > 3 ? "..." : "",
1862                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1863 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1864         if (locks && class_export_dump_hook != NULL)
1865                 class_export_dump_hook(exp);
1866 #endif
1867 }
1868
1869 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1870 {
1871         struct obd_export *exp;
1872
1873         spin_lock(&obd->obd_dev_lock);
1874         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1875                 print_export_data(exp, "ACTIVE", locks, debug_level);
1876         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1877                 print_export_data(exp, "UNLINKED", locks, debug_level);
1878         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1879                 print_export_data(exp, "DELAYED", locks, debug_level);
1880         spin_unlock(&obd->obd_dev_lock);
1881 }
1882
1883 void obd_exports_barrier(struct obd_device *obd)
1884 {
1885         int waited = 2;
1886         LASSERT(list_empty(&obd->obd_exports));
1887         spin_lock(&obd->obd_dev_lock);
1888         while (!list_empty(&obd->obd_unlinked_exports)) {
1889                 spin_unlock(&obd->obd_dev_lock);
1890                 set_current_state(TASK_UNINTERRUPTIBLE);
1891                 schedule_timeout(cfs_time_seconds(waited));
1892                 if (waited > 5 && is_power_of_2(waited)) {
1893                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1894                                       "more than %d seconds. "
1895                                       "The obd refcount = %d. Is it stuck?\n",
1896                                       obd->obd_name, waited,
1897                                       atomic_read(&obd->obd_refcount));
1898                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1899                 }
1900                 waited *= 2;
1901                 spin_lock(&obd->obd_dev_lock);
1902         }
1903         spin_unlock(&obd->obd_dev_lock);
1904 }
1905 EXPORT_SYMBOL(obd_exports_barrier);
1906
1907 /**
1908  * Add export to the obd_zombe thread and notify it.
1909  */
1910 static void obd_zombie_export_add(struct obd_export *exp) {
1911         atomic_dec(&obd_stale_export_num);
1912         spin_lock(&exp->exp_obd->obd_dev_lock);
1913         LASSERT(!list_empty(&exp->exp_obd_chain));
1914         list_del_init(&exp->exp_obd_chain);
1915         spin_unlock(&exp->exp_obd->obd_dev_lock);
1916
1917         queue_work(zombie_wq, &exp->exp_zombie_work);
1918 }
1919
1920 /**
1921  * Add import to the obd_zombe thread and notify it.
1922  */
1923 static void obd_zombie_import_add(struct obd_import *imp) {
1924         LASSERT(imp->imp_sec == NULL);
1925
1926         queue_work(zombie_wq, &imp->imp_zombie_work);
1927 }
1928
1929 /**
1930  * wait when obd_zombie import/export queues become empty
1931  */
1932 void obd_zombie_barrier(void)
1933 {
1934         flush_workqueue(zombie_wq);
1935 }
1936 EXPORT_SYMBOL(obd_zombie_barrier);
1937
1938
1939 struct obd_export *obd_stale_export_get(void)
1940 {
1941         struct obd_export *exp = NULL;
1942         ENTRY;
1943
1944         spin_lock(&obd_stale_export_lock);
1945         if (!list_empty(&obd_stale_exports)) {
1946                 exp = list_entry(obd_stale_exports.next,
1947                                  struct obd_export, exp_stale_list);
1948                 list_del_init(&exp->exp_stale_list);
1949         }
1950         spin_unlock(&obd_stale_export_lock);
1951
1952         if (exp) {
1953                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1954                        atomic_read(&obd_stale_export_num));
1955         }
1956         RETURN(exp);
1957 }
1958 EXPORT_SYMBOL(obd_stale_export_get);
1959
1960 void obd_stale_export_put(struct obd_export *exp)
1961 {
1962         ENTRY;
1963
1964         LASSERT(list_empty(&exp->exp_stale_list));
1965         if (exp->exp_lock_hash &&
1966             atomic_read(&exp->exp_lock_hash->hs_count)) {
1967                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1968                        atomic_read(&obd_stale_export_num));
1969
1970                 spin_lock_bh(&exp->exp_bl_list_lock);
1971                 spin_lock(&obd_stale_export_lock);
1972                 /* Add to the tail if there is no blocked locks,
1973                  * to the head otherwise. */
1974                 if (list_empty(&exp->exp_bl_list))
1975                         list_add_tail(&exp->exp_stale_list,
1976                                       &obd_stale_exports);
1977                 else
1978                         list_add(&exp->exp_stale_list,
1979                                  &obd_stale_exports);
1980
1981                 spin_unlock(&obd_stale_export_lock);
1982                 spin_unlock_bh(&exp->exp_bl_list_lock);
1983         } else {
1984                 class_export_put(exp);
1985         }
1986         EXIT;
1987 }
1988 EXPORT_SYMBOL(obd_stale_export_put);
1989
1990 /**
1991  * Adjust the position of the export in the stale list,
1992  * i.e. move to the head of the list if is needed.
1993  **/
1994 void obd_stale_export_adjust(struct obd_export *exp)
1995 {
1996         LASSERT(exp != NULL);
1997         spin_lock_bh(&exp->exp_bl_list_lock);
1998         spin_lock(&obd_stale_export_lock);
1999
2000         if (!list_empty(&exp->exp_stale_list) &&
2001             !list_empty(&exp->exp_bl_list))
2002                 list_move(&exp->exp_stale_list, &obd_stale_exports);
2003
2004         spin_unlock(&obd_stale_export_lock);
2005         spin_unlock_bh(&exp->exp_bl_list_lock);
2006 }
2007 EXPORT_SYMBOL(obd_stale_export_adjust);
2008
2009 /**
2010  * start destroy zombie import/export thread
2011  */
2012 int obd_zombie_impexp_init(void)
2013 {
2014         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
2015         if (!zombie_wq)
2016                 return -ENOMEM;
2017
2018         return 0;
2019 }
2020
2021 /**
2022  * stop destroy zombie import/export thread
2023  */
2024 void obd_zombie_impexp_stop(void)
2025 {
2026         destroy_workqueue(zombie_wq);
2027         LASSERT(list_empty(&obd_stale_exports));
2028 }
2029
2030 /***** Kernel-userspace comm helpers *******/
2031
2032 /* Get length of entire message, including header */
2033 int kuc_len(int payload_len)
2034 {
2035         return sizeof(struct kuc_hdr) + payload_len;
2036 }
2037 EXPORT_SYMBOL(kuc_len);
2038
2039 /* Get a pointer to kuc header, given a ptr to the payload
2040  * @param p Pointer to payload area
2041  * @returns Pointer to kuc header
2042  */
2043 struct kuc_hdr * kuc_ptr(void *p)
2044 {
2045         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2046         LASSERT(lh->kuc_magic == KUC_MAGIC);
2047         return lh;
2048 }
2049 EXPORT_SYMBOL(kuc_ptr);
2050
2051 /* Alloc space for a message, and fill in header
2052  * @return Pointer to payload area
2053  */
2054 void *kuc_alloc(int payload_len, int transport, int type)
2055 {
2056         struct kuc_hdr *lh;
2057         int len = kuc_len(payload_len);
2058
2059         OBD_ALLOC(lh, len);
2060         if (lh == NULL)
2061                 return ERR_PTR(-ENOMEM);
2062
2063         lh->kuc_magic = KUC_MAGIC;
2064         lh->kuc_transport = transport;
2065         lh->kuc_msgtype = type;
2066         lh->kuc_msglen = len;
2067
2068         return (void *)(lh + 1);
2069 }
2070 EXPORT_SYMBOL(kuc_alloc);
2071
2072 /* Takes pointer to payload area */
2073 void kuc_free(void *p, int payload_len)
2074 {
2075         struct kuc_hdr *lh = kuc_ptr(p);
2076         OBD_FREE(lh, kuc_len(payload_len));
2077 }
2078 EXPORT_SYMBOL(kuc_free);
2079
2080 struct obd_request_slot_waiter {
2081         struct list_head        orsw_entry;
2082         wait_queue_head_t       orsw_waitq;
2083         bool                    orsw_signaled;
2084 };
2085
2086 static bool obd_request_slot_avail(struct client_obd *cli,
2087                                    struct obd_request_slot_waiter *orsw)
2088 {
2089         bool avail;
2090
2091         spin_lock(&cli->cl_loi_list_lock);
2092         avail = !!list_empty(&orsw->orsw_entry);
2093         spin_unlock(&cli->cl_loi_list_lock);
2094
2095         return avail;
2096 };
2097
2098 /*
2099  * For network flow control, the RPC sponsor needs to acquire a credit
2100  * before sending the RPC. The credits count for a connection is defined
2101  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2102  * the subsequent RPC sponsors need to wait until others released their
2103  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2104  */
2105 int obd_get_request_slot(struct client_obd *cli)
2106 {
2107         struct obd_request_slot_waiter   orsw;
2108         struct l_wait_info               lwi;
2109         int                              rc;
2110
2111         spin_lock(&cli->cl_loi_list_lock);
2112         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2113                 cli->cl_rpcs_in_flight++;
2114                 spin_unlock(&cli->cl_loi_list_lock);
2115                 return 0;
2116         }
2117
2118         init_waitqueue_head(&orsw.orsw_waitq);
2119         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2120         orsw.orsw_signaled = false;
2121         spin_unlock(&cli->cl_loi_list_lock);
2122
2123         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2124         rc = l_wait_event(orsw.orsw_waitq,
2125                           obd_request_slot_avail(cli, &orsw) ||
2126                           orsw.orsw_signaled,
2127                           &lwi);
2128
2129         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2130          * freed but other (such as obd_put_request_slot) is using it. */
2131         spin_lock(&cli->cl_loi_list_lock);
2132         if (rc != 0) {
2133                 if (!orsw.orsw_signaled) {
2134                         if (list_empty(&orsw.orsw_entry))
2135                                 cli->cl_rpcs_in_flight--;
2136                         else
2137                                 list_del(&orsw.orsw_entry);
2138                 }
2139         }
2140
2141         if (orsw.orsw_signaled) {
2142                 LASSERT(list_empty(&orsw.orsw_entry));
2143
2144                 rc = -EINTR;
2145         }
2146         spin_unlock(&cli->cl_loi_list_lock);
2147
2148         return rc;
2149 }
2150 EXPORT_SYMBOL(obd_get_request_slot);
2151
2152 void obd_put_request_slot(struct client_obd *cli)
2153 {
2154         struct obd_request_slot_waiter *orsw;
2155
2156         spin_lock(&cli->cl_loi_list_lock);
2157         cli->cl_rpcs_in_flight--;
2158
2159         /* If there is free slot, wakeup the first waiter. */
2160         if (!list_empty(&cli->cl_flight_waiters) &&
2161             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2162                 orsw = list_entry(cli->cl_flight_waiters.next,
2163                                   struct obd_request_slot_waiter, orsw_entry);
2164                 list_del_init(&orsw->orsw_entry);
2165                 cli->cl_rpcs_in_flight++;
2166                 wake_up(&orsw->orsw_waitq);
2167         }
2168         spin_unlock(&cli->cl_loi_list_lock);
2169 }
2170 EXPORT_SYMBOL(obd_put_request_slot);
2171
2172 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2173 {
2174         return cli->cl_max_rpcs_in_flight;
2175 }
2176 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2177
2178 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2179 {
2180         struct obd_request_slot_waiter *orsw;
2181         __u32                           old;
2182         int                             diff;
2183         int                             i;
2184         char                            *typ_name;
2185         int                             rc;
2186
2187         if (max > OBD_MAX_RIF_MAX || max < 1)
2188                 return -ERANGE;
2189
2190         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2191         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2192                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2193                  * strictly lower that max_rpcs_in_flight */
2194                 if (max < 2) {
2195                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2196                                "because it must be higher than "
2197                                "max_mod_rpcs_in_flight value",
2198                                cli->cl_import->imp_obd->obd_name);
2199                         return -ERANGE;
2200                 }
2201                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2202                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2203                         if (rc != 0)
2204                                 return rc;
2205                 }
2206         }
2207
2208         spin_lock(&cli->cl_loi_list_lock);
2209         old = cli->cl_max_rpcs_in_flight;
2210         cli->cl_max_rpcs_in_flight = max;
2211         client_adjust_max_dirty(cli);
2212
2213         diff = max - old;
2214
2215         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2216         for (i = 0; i < diff; i++) {
2217                 if (list_empty(&cli->cl_flight_waiters))
2218                         break;
2219
2220                 orsw = list_entry(cli->cl_flight_waiters.next,
2221                                   struct obd_request_slot_waiter, orsw_entry);
2222                 list_del_init(&orsw->orsw_entry);
2223                 cli->cl_rpcs_in_flight++;
2224                 wake_up(&orsw->orsw_waitq);
2225         }
2226         spin_unlock(&cli->cl_loi_list_lock);
2227
2228         return 0;
2229 }
2230 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2231
2232 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2233 {
2234         return cli->cl_max_mod_rpcs_in_flight;
2235 }
2236 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2237
2238 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2239 {
2240         struct obd_connect_data *ocd;
2241         __u16 maxmodrpcs;
2242         __u16 prev;
2243
2244         if (max > OBD_MAX_RIF_MAX || max < 1)
2245                 return -ERANGE;
2246
2247         /* cannot exceed or equal max_rpcs_in_flight */
2248         if (max >= cli->cl_max_rpcs_in_flight) {
2249                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2250                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2251                        cli->cl_import->imp_obd->obd_name,
2252                        max, cli->cl_max_rpcs_in_flight);
2253                 return -ERANGE;
2254         }
2255
2256         /* cannot exceed max modify RPCs in flight supported by the server */
2257         ocd = &cli->cl_import->imp_connect_data;
2258         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2259                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2260         else
2261                 maxmodrpcs = 1;
2262         if (max > maxmodrpcs) {
2263                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2264                        "higher than max_mod_rpcs_per_client value (%hu) "
2265                        "returned by the server at connection\n",
2266                        cli->cl_import->imp_obd->obd_name,
2267                        max, maxmodrpcs);
2268                 return -ERANGE;
2269         }
2270
2271         spin_lock(&cli->cl_mod_rpcs_lock);
2272
2273         prev = cli->cl_max_mod_rpcs_in_flight;
2274         cli->cl_max_mod_rpcs_in_flight = max;
2275
2276         /* wakeup waiters if limit has been increased */
2277         if (cli->cl_max_mod_rpcs_in_flight > prev)
2278                 wake_up(&cli->cl_mod_rpcs_waitq);
2279
2280         spin_unlock(&cli->cl_mod_rpcs_lock);
2281
2282         return 0;
2283 }
2284 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2285
2286
2287 #define pct(a, b) (b ? a * 100 / b : 0)
2288 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2289                                struct seq_file *seq)
2290 {
2291         unsigned long mod_tot = 0, mod_cum;
2292         struct timespec64 now;
2293         int i;
2294
2295         ktime_get_real_ts64(&now);
2296
2297         spin_lock(&cli->cl_mod_rpcs_lock);
2298
2299         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2300                    (s64)now.tv_sec, now.tv_nsec);
2301         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2302                    cli->cl_mod_rpcs_in_flight);
2303
2304         seq_printf(seq, "\n\t\t\tmodify\n");
2305         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2306
2307         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2308
2309         mod_cum = 0;
2310         for (i = 0; i < OBD_HIST_MAX; i++) {
2311                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2312                 mod_cum += mod;
2313                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2314                            i, mod, pct(mod, mod_tot),
2315                            pct(mod_cum, mod_tot));
2316                 if (mod_cum == mod_tot)
2317                         break;
2318         }
2319
2320         spin_unlock(&cli->cl_mod_rpcs_lock);
2321
2322         return 0;
2323 }
2324 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2325 #undef pct
2326
2327
2328 /* The number of modify RPCs sent in parallel is limited
2329  * because the server has a finite number of slots per client to
2330  * store request result and ensure reply reconstruction when needed.
2331  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2332  * that takes into account server limit and cl_max_rpcs_in_flight
2333  * value.
2334  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2335  * one close request is allowed above the maximum.
2336  */
2337 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2338                                                  bool close_req)
2339 {
2340         bool avail;
2341
2342         /* A slot is available if
2343          * - number of modify RPCs in flight is less than the max
2344          * - it's a close RPC and no other close request is in flight
2345          */
2346         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2347                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2348
2349         return avail;
2350 }
2351
2352 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2353                                          bool close_req)
2354 {
2355         bool avail;
2356
2357         spin_lock(&cli->cl_mod_rpcs_lock);
2358         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2359         spin_unlock(&cli->cl_mod_rpcs_lock);
2360         return avail;
2361 }
2362
2363 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2364 {
2365         if (it != NULL &&
2366             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2367              it->it_op == IT_READDIR ||
2368              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2369                         return true;
2370         return false;
2371 }
2372
2373 /* Get a modify RPC slot from the obd client @cli according
2374  * to the kind of operation @opc that is going to be sent
2375  * and the intent @it of the operation if it applies.
2376  * If the maximum number of modify RPCs in flight is reached
2377  * the thread is put to sleep.
2378  * Returns the tag to be set in the request message. Tag 0
2379  * is reserved for non-modifying requests.
2380  */
2381 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2382                            struct lookup_intent *it)
2383 {
2384         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2385         bool                    close_req = false;
2386         __u16                   i, max;
2387
2388         /* read-only metadata RPCs don't consume a slot on MDT
2389          * for reply reconstruction
2390          */
2391         if (obd_skip_mod_rpc_slot(it))
2392                 return 0;
2393
2394         if (opc == MDS_CLOSE)
2395                 close_req = true;
2396
2397         do {
2398                 spin_lock(&cli->cl_mod_rpcs_lock);
2399                 max = cli->cl_max_mod_rpcs_in_flight;
2400                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2401                         /* there is a slot available */
2402                         cli->cl_mod_rpcs_in_flight++;
2403                         if (close_req)
2404                                 cli->cl_close_rpcs_in_flight++;
2405                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2406                                          cli->cl_mod_rpcs_in_flight);
2407                         /* find a free tag */
2408                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2409                                                 max + 1);
2410                         LASSERT(i < OBD_MAX_RIF_MAX);
2411                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2412                         spin_unlock(&cli->cl_mod_rpcs_lock);
2413                         /* tag 0 is reserved for non-modify RPCs */
2414                         return i + 1;
2415                 }
2416                 spin_unlock(&cli->cl_mod_rpcs_lock);
2417
2418                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2419                        "opc %u, max %hu\n",
2420                        cli->cl_import->imp_obd->obd_name, opc, max);
2421
2422                 l_wait_event(cli->cl_mod_rpcs_waitq,
2423                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2424         } while (true);
2425 }
2426 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2427
2428 /* Put a modify RPC slot from the obd client @cli according
2429  * to the kind of operation @opc that has been sent and the
2430  * intent @it of the operation if it applies.
2431  */
2432 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2433                           struct lookup_intent *it, __u16 tag)
2434 {
2435         bool                    close_req = false;
2436
2437         if (obd_skip_mod_rpc_slot(it))
2438                 return;
2439
2440         if (opc == MDS_CLOSE)
2441                 close_req = true;
2442
2443         spin_lock(&cli->cl_mod_rpcs_lock);
2444         cli->cl_mod_rpcs_in_flight--;
2445         if (close_req)
2446                 cli->cl_close_rpcs_in_flight--;
2447         /* release the tag in the bitmap */
2448         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2449         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2450         spin_unlock(&cli->cl_mod_rpcs_lock);
2451         wake_up(&cli->cl_mod_rpcs_waitq);
2452 }
2453 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2454