Whamcloud - gitweb
LU-1445 osp: Use FID to track precreate cache.
[fs/lustre-release.git] / lustre / osp / osp_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osp/osp_dev.c
37  *
38  * Lustre OST Proxy Device
39  *
40  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
41  * Author: Mikhail Pershin <mike.pershin@intel.com>
42  * Author: Di Wang <di.wang@intel.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48 #define DEBUG_SUBSYSTEM S_MDS
49
50 #include <obd_class.h>
51 #include <lustre_param.h>
52 #include <lustre_log.h>
53 #include <lustre_mdc.h>
54
55 #include "osp_internal.h"
56
57 /* Slab for OSP object allocation */
58 cfs_mem_cache_t *osp_object_kmem;
59
60 static struct lu_kmem_descr osp_caches[] = {
61         {
62                 .ckd_cache = &osp_object_kmem,
63                 .ckd_name  = "osp_obj",
64                 .ckd_size  = sizeof(struct osp_object)
65         },
66         {
67                 .ckd_cache = NULL
68         }
69 };
70
71 struct lu_object *osp_object_alloc(const struct lu_env *env,
72                                    const struct lu_object_header *hdr,
73                                    struct lu_device *d)
74 {
75         struct lu_object_header *h;
76         struct osp_object       *o;
77         struct lu_object        *l;
78
79         LASSERT(hdr == NULL);
80
81         OBD_SLAB_ALLOC_PTR_GFP(o, osp_object_kmem, CFS_ALLOC_IO);
82         if (o != NULL) {
83                 l = &o->opo_obj.do_lu;
84                 h = &o->opo_header;
85
86                 lu_object_header_init(h);
87                 dt_object_init(&o->opo_obj, h, d);
88                 lu_object_add_top(h, l);
89
90                 l->lo_ops = &osp_lu_obj_ops;
91
92                 return l;
93         } else {
94                 return NULL;
95         }
96 }
97
98 static struct dt_object
99 *osp_find_or_create(const struct lu_env *env, struct osp_device *osp,
100                     struct lu_attr *attr, __u32 reg_id)
101 {
102         struct osp_thread_info *osi = osp_env_info(env);
103         struct dt_object_format dof = { 0 };
104         struct dt_object       *dto;
105         int                  rc;
106         ENTRY;
107
108         lu_local_obj_fid(&osi->osi_fid, reg_id);
109         attr->la_valid = LA_MODE;
110         attr->la_mode = S_IFREG | 0644;
111         dof.dof_type = DFT_REGULAR;
112         dto = dt_find_or_create(env, osp->opd_storage, &osi->osi_fid,
113                                 &dof, attr);
114         if (IS_ERR(dto))
115                 RETURN(dto);
116
117         rc = dt_attr_get(env, dto, attr, NULL);
118         if (rc) {
119                 CERROR("%s: can't be initialized: rc = %d\n",
120                        osp->opd_obd->obd_name, rc);
121                 lu_object_put(env, &dto->do_lu);
122                 RETURN(ERR_PTR(rc));
123         }
124         RETURN(dto);
125 }
126
127 static int osp_write_local_file(const struct lu_env *env,
128                                 struct osp_device *osp,
129                                 struct dt_object *dt_obj,
130                                 struct lu_buf *buf,
131                                 loff_t offset)
132 {
133         struct thandle *th;
134         int rc;
135
136         th = dt_trans_create(env, osp->opd_storage);
137         if (IS_ERR(th))
138                 RETURN(PTR_ERR(th));
139
140         rc = dt_declare_record_write(env, dt_obj, buf->lb_len, offset, th);
141         if (rc)
142                 GOTO(out, rc);
143         rc = dt_trans_start_local(env, osp->opd_storage, th);
144         if (rc)
145                 GOTO(out, rc);
146
147         rc = dt_record_write(env, dt_obj, buf, &offset, th);
148 out:
149         dt_trans_stop(env, osp->opd_storage, th);
150         RETURN(rc);
151 }
152
153 static int osp_init_last_objid(const struct lu_env *env, struct osp_device *osp)
154 {
155         struct osp_thread_info  *osi = osp_env_info(env);
156         struct lu_fid           *fid = &osp->opd_last_used_fid;
157         struct dt_object        *dto;
158         int                     rc;
159         ENTRY;
160
161         dto = osp_find_or_create(env, osp, &osi->osi_attr, MDD_LOV_OBJ_OID);
162         if (IS_ERR(dto))
163                 RETURN(PTR_ERR(dto));
164         /* object will be released in device cleanup path */
165         if (osi->osi_attr.la_size >=
166             sizeof(osi->osi_id) * (osp->opd_index + 1)) {
167                 osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_oid,
168                                    osp->opd_index);
169                 rc = dt_record_read(env, dto, &osi->osi_lb, &osi->osi_off);
170                 if (rc != 0)
171                         GOTO(out, rc);
172         } else {
173                 fid->f_oid = 0;
174                 osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_oid,
175                                    osp->opd_index);
176                 rc = osp_write_local_file(env, osp, dto, &osi->osi_lb,
177                                           osi->osi_off);
178         }
179         osp->opd_last_used_oid_file = dto;
180         RETURN(0);
181 out:
182         /* object will be released in device cleanup path */
183         CERROR("%s: can't initialize lov_objid: rc = %d\n",
184                osp->opd_obd->obd_name, rc);
185         lu_object_put(env, &dto->do_lu);
186         osp->opd_last_used_oid_file = NULL;
187         RETURN(rc);
188 }
189
190 static int osp_init_last_seq(const struct lu_env *env, struct osp_device *osp)
191 {
192         struct osp_thread_info  *osi = osp_env_info(env);
193         struct lu_fid           *fid = &osp->opd_last_used_fid;
194         struct dt_object        *dto;
195         int                     rc;
196         ENTRY;
197
198         dto = osp_find_or_create(env, osp, &osi->osi_attr, MDD_LOV_OBJ_OSEQ);
199         if (IS_ERR(dto))
200                 RETURN(PTR_ERR(dto));
201
202         /* object will be released in device cleanup path */
203         if (osi->osi_attr.la_size >=
204             sizeof(osi->osi_id) * (osp->opd_index + 1)) {
205                 osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_seq,
206                                    osp->opd_index);
207                 rc = dt_record_read(env, dto, &osi->osi_lb, &osi->osi_off);
208                 if (rc != 0)
209                         GOTO(out, rc);
210         } else {
211                 fid->f_seq = 0;
212                 osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_seq,
213                                     osp->opd_index);
214                 rc = osp_write_local_file(env, osp, dto, &osi->osi_lb,
215                                           osi->osi_off);
216         }
217         osp->opd_last_used_seq_file = dto;
218         RETURN(0);
219 out:
220         /* object will be released in device cleanup path */
221         CERROR("%s: can't initialize lov_seq: rc = %d\n",
222                osp->opd_obd->obd_name, rc);
223         lu_object_put(env, &dto->do_lu);
224         osp->opd_last_used_seq_file = NULL;
225         RETURN(rc);
226 }
227
228 static int osp_last_used_init(const struct lu_env *env, struct osp_device *osp)
229 {
230         struct osp_thread_info *osi = osp_env_info(env);
231         int                  rc;
232         ENTRY;
233
234         fid_zero(&osp->opd_last_used_fid);
235         rc = osp_init_last_objid(env, osp);
236         if (rc < 0) {
237                 CERROR("%s: Can not get ids %d from old objid!\n",
238                        osp->opd_obd->obd_name, rc);
239                 RETURN(rc);
240         }
241
242         rc = osp_init_last_seq(env, osp);
243         if (rc < 0) {
244                 CERROR("%s: Can not get ids %d from old objid!\n",
245                        osp->opd_obd->obd_name, rc);
246                 GOTO(out, rc);
247         }
248
249         if (fid_oid(&osp->opd_last_used_fid) != 0 &&
250             fid_seq(&osp->opd_last_used_fid) == 0) {
251                 /* Just upgrade from the old version,
252                  * set the seq to be IDIF */
253                 osp->opd_last_used_fid.f_seq =
254                    fid_idif_seq(fid_oid(&osp->opd_last_used_fid),
255                                 osp->opd_index);
256                 osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off,
257                                     &osp->opd_last_used_fid.f_seq,
258                                     osp->opd_index);
259                 rc = osp_write_local_file(env, osp, osp->opd_last_used_seq_file,
260                                           &osi->osi_lb, osi->osi_off);
261                 if (rc) {
262                         CERROR("%s : Can not write seq file: rc = %d\n",
263                                osp->opd_obd->obd_name, rc);
264                         GOTO(out, rc);
265                 }
266         }
267
268         if (!fid_is_zero(&osp->opd_last_used_fid) &&
269                  !fid_is_sane(&osp->opd_last_used_fid)) {
270                 CERROR("%s: Got invalid FID "DFID"\n", osp->opd_obd->obd_name,
271                         PFID(&osp->opd_last_used_fid));
272                 GOTO(out, rc = -EINVAL);
273         }
274
275         CDEBUG(D_INFO, "%s: Init last used fid "DFID"\n",
276                osp->opd_obd->obd_name, PFID(&osp->opd_last_used_fid));
277 out:
278         if (rc != 0) {
279                 if (osp->opd_last_used_oid_file != NULL) {
280                         lu_object_put(env, &osp->opd_last_used_oid_file->do_lu);
281                         osp->opd_last_used_oid_file = NULL;
282                 }
283                 if (osp->opd_last_used_seq_file != NULL) {
284                         lu_object_put(env, &osp->opd_last_used_seq_file->do_lu);
285                         osp->opd_last_used_seq_file = NULL;
286                 }
287         }
288
289         RETURN(rc);
290 }
291
292 static void osp_last_used_fini(const struct lu_env *env, struct osp_device *d)
293 {
294         /* release last_used file */
295         if (d->opd_last_used_oid_file != NULL) {
296                 lu_object_put(env, &d->opd_last_used_oid_file->do_lu);
297                 d->opd_last_used_oid_file = NULL;
298         }
299
300         if (d->opd_last_used_seq_file != NULL) {
301                 lu_object_put(env, &d->opd_last_used_seq_file->do_lu);
302                 d->opd_last_used_seq_file = NULL;
303         }
304 }
305
306 int osp_disconnect(struct osp_device *d)
307 {
308         struct obd_import *imp;
309         int rc = 0;
310
311         imp = d->opd_obd->u.cli.cl_import;
312
313         /* Mark import deactivated now, so we don't try to reconnect if any
314          * of the cleanup RPCs fails (e.g. ldlm cancel, etc).  We don't
315          * fully deactivate the import, or that would drop all requests. */
316         LASSERT(imp != NULL);
317         spin_lock(&imp->imp_lock);
318         imp->imp_deactive = 1;
319         spin_unlock(&imp->imp_lock);
320
321         ptlrpc_deactivate_import(imp);
322
323         /* Some non-replayable imports (MDS's OSCs) are pinged, so just
324          * delete it regardless.  (It's safe to delete an import that was
325          * never added.) */
326         (void)ptlrpc_pinger_del_import(imp);
327
328         rc = ptlrpc_disconnect_import(imp, 0);
329         if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ESHUTDOWN)
330                 rc = 0;
331         if (rc)
332                 CERROR("%s: can't disconnect: rc = %d\n",
333                        d->opd_obd->obd_name, rc);
334
335         ptlrpc_invalidate_import(imp);
336
337         RETURN(rc);
338 }
339
340 static int osp_shutdown(const struct lu_env *env, struct osp_device *d)
341 {
342         int                      rc = 0;
343         ENTRY;
344
345         if (is_osp_for_connection(d->opd_obd->obd_name)) {
346                 rc = osp_disconnect(d);
347                 RETURN(rc);
348         }
349
350         LASSERT(env);
351         /* release last_used file */
352         if (!d->opd_connect_mdt)
353                 osp_last_used_fini(env, d);
354
355         rc = osp_disconnect(d);
356
357         if (!d->opd_connect_mdt) {
358                 /* stop precreate thread */
359                 osp_precreate_fini(d);
360
361                 /* stop sync thread */
362                 osp_sync_fini(d);
363         }
364
365         obd_fid_fini(d->opd_obd);
366
367         RETURN(rc);
368 }
369
370 static int osp_process_config(const struct lu_env *env,
371                               struct lu_device *dev, struct lustre_cfg *lcfg)
372 {
373         struct osp_device               *d = lu2osp_dev(dev);
374         struct lprocfs_static_vars       lvars = { 0 };
375         int                              rc;
376
377         ENTRY;
378
379         switch (lcfg->lcfg_command) {
380         case LCFG_CLEANUP:
381                 if (!is_osp_for_connection(d->opd_obd->obd_name))
382                         lu_dev_del_linkage(dev->ld_site, dev);
383                 rc = osp_shutdown(env, d);
384                 break;
385         case LCFG_PARAM:
386                 lprocfs_osp_init_vars(&lvars);
387
388                 LASSERT(d->opd_obd);
389                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
390                                               lcfg, d->opd_obd);
391                 if (rc > 0)
392                         rc = 0;
393                 if (rc == -ENOSYS) {
394                         /* class_process_proc_param() haven't found matching
395                          * parameter and returned ENOSYS so that layer(s)
396                          * below could use that. But OSP is the bottom, so
397                          * just ignore it */
398                         CERROR("%s: unknown param %s\n",
399                                (char *)lustre_cfg_string(lcfg, 0),
400                                (char *)lustre_cfg_string(lcfg, 1));
401                         rc = 0;
402                 }
403                 break;
404         default:
405                 CERROR("%s: unknown command %u\n",
406                        (char *)lustre_cfg_string(lcfg, 0), lcfg->lcfg_command);
407                 rc = 0;
408                 break;
409         }
410
411         RETURN(rc);
412 }
413
414 static int osp_recovery_complete(const struct lu_env *env,
415                                  struct lu_device *dev)
416 {
417         struct osp_device       *osp = lu2osp_dev(dev);
418         int                      rc = 0;
419
420         ENTRY;
421         osp->opd_recovery_completed = 1;
422         if (!osp->opd_connect_mdt)
423                 cfs_waitq_signal(&osp->opd_pre_waitq);
424         RETURN(rc);
425 }
426
427 const struct lu_device_operations osp_lu_ops = {
428         .ldo_object_alloc       = osp_object_alloc,
429         .ldo_process_config     = osp_process_config,
430         .ldo_recovery_complete  = osp_recovery_complete,
431 };
432
433 /**
434  * provides with statfs from corresponded OST
435  *
436  */
437 static int osp_statfs(const struct lu_env *env, struct dt_device *dev,
438                       struct obd_statfs *sfs)
439 {
440         struct osp_device *d = dt2osp_dev(dev);
441
442         ENTRY;
443
444         if (unlikely(d->opd_imp_active == 0))
445                 RETURN(-ENOTCONN);
446
447         /* return recently updated data */
448         *sfs = d->opd_statfs;
449
450         /*
451          * layer above osp (usually lod) can use ffree to estimate
452          * how many objects are available for immediate creation
453          */
454
455         spin_lock(&d->opd_pre_lock);
456         LASSERTF(fid_seq(&d->opd_pre_last_created_fid) ==
457                  fid_seq(&d->opd_pre_used_fid),
458                  "last_created "DFID", next_fid "DFID"\n",
459                  PFID(&d->opd_pre_last_created_fid),
460                  PFID(&d->opd_pre_used_fid));
461         sfs->os_fprecreated = fid_oid(&d->opd_pre_last_created_fid) -
462                               fid_oid(&d->opd_pre_used_fid);
463         sfs->os_fprecreated -= d->opd_pre_reserved;
464         spin_unlock(&d->opd_pre_lock);
465
466         LASSERT(sfs->os_fprecreated <= OST_MAX_PRECREATE * 2);
467
468         CDEBUG(D_OTHER, "%s: "LPU64" blocks, "LPU64" free, "LPU64" avail, "
469                LPU64" files, "LPU64" free files\n", d->opd_obd->obd_name,
470                sfs->os_blocks, sfs->os_bfree, sfs->os_bavail,
471                sfs->os_files, sfs->os_ffree);
472         RETURN(0);
473 }
474
475 static int osp_sync(const struct lu_env *env, struct dt_device *dev)
476 {
477         ENTRY;
478
479         /*
480          * XXX: wake up sync thread, command it to start flushing asap?
481          */
482
483         RETURN(0);
484 }
485
486 const struct dt_device_operations osp_dt_ops = {
487         .dt_statfs      = osp_statfs,
488         .dt_sync        = osp_sync,
489 };
490
491 static int osp_connect_to_osd(const struct lu_env *env, struct osp_device *m,
492                               const char *nextdev)
493 {
494         struct obd_connect_data *data = NULL;
495         struct obd_device       *obd;
496         int                      rc;
497
498         ENTRY;
499
500         LASSERT(m->opd_storage_exp == NULL);
501
502         OBD_ALLOC_PTR(data);
503         if (data == NULL)
504                 RETURN(-ENOMEM);
505
506         obd = class_name2obd(nextdev);
507         if (obd == NULL) {
508                 CERROR("%s: can't locate next device: %s\n",
509                        m->opd_obd->obd_name, nextdev);
510                 GOTO(out, rc = -ENOTCONN);
511         }
512
513         rc = obd_connect(env, &m->opd_storage_exp, obd, &obd->obd_uuid, data,
514                          NULL);
515         if (rc) {
516                 CERROR("%s: cannot connect to next dev %s: rc = %d\n",
517                        m->opd_obd->obd_name, nextdev, rc);
518                 GOTO(out, rc);
519         }
520
521         m->opd_dt_dev.dd_lu_dev.ld_site =
522                 m->opd_storage_exp->exp_obd->obd_lu_dev->ld_site;
523         LASSERT(m->opd_dt_dev.dd_lu_dev.ld_site);
524         m->opd_storage = lu2dt_dev(m->opd_storage_exp->exp_obd->obd_lu_dev);
525
526 out:
527         OBD_FREE_PTR(data);
528         RETURN(rc);
529 }
530
531 static int osp_init0(const struct lu_env *env, struct osp_device *m,
532                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
533 {
534         struct obd_device       *obd;
535         struct obd_import       *imp;
536         class_uuid_t            uuid;
537         char                    *src, *tgt, *mdt, *osdname = NULL;
538         int                     rc, idx;
539
540         ENTRY;
541
542         obd = class_name2obd(lustre_cfg_string(cfg, 0));
543         if (obd == NULL) {
544                 CERROR("Cannot find obd with name %s\n",
545                        lustre_cfg_string(cfg, 0));
546                 RETURN(-ENODEV);
547         }
548         m->opd_obd = obd;
549
550         /* There is no record in the MDT configuration for the local disk
551          * device, so we have to extract this from elsewhere in the profile.
552          * The only information we get at setup is from the OSC records:
553          * setup 0:{fsname}-OSTxxxx-osc[-MDTxxxx] 1:lustre-OST0000_UUID 2:NID
554          * Note that 1.8 generated configs are missing the -MDTxxxx part.
555          * We need to reconstruct the name of the underlying OSD from this:
556          * {fsname}-{svname}-osd, for example "lustre-MDT0000-osd".  We
557          * also need to determine the OST index from this - will be used
558          * to calculate the offset in shared lov_objids file later */
559
560         src = lustre_cfg_string(cfg, 0);
561         if (src == NULL)
562                 RETURN(-EINVAL);
563
564         tgt = strrchr(src, '-');
565         if (tgt == NULL) {
566                 CERROR("%s: invalid target name %s\n",
567                        m->opd_obd->obd_name, lustre_cfg_string(cfg, 0));
568                 RETURN(-EINVAL);
569         }
570
571         if (strncmp(tgt, "-osc", 4) == 0) {
572                 /* Old OSC name fsname-OSTXXXX-osc */
573                 for (tgt--; tgt > src && *tgt != '-'; tgt--)
574                         ;
575                 if (tgt == src) {
576                         CERROR("%s: invalid target name %s\n",
577                                m->opd_obd->obd_name, lustre_cfg_string(cfg, 0));
578                         RETURN(-EINVAL);
579                 }
580
581                 if (strncmp(tgt, "-OST", 4) != 0) {
582                         CERROR("%s: invalid target name %s\n",
583                                m->opd_obd->obd_name, lustre_cfg_string(cfg, 0));
584                         RETURN(-EINVAL);
585                 }
586
587                 idx = simple_strtol(tgt + 4, &mdt, 16);
588                 if (mdt[0] != '-' || idx > INT_MAX || idx < 0) {
589                         CERROR("%s: invalid OST index in '%s'\n",
590                                m->opd_obd->obd_name, src);
591                         RETURN(-EINVAL);
592                 }
593                 m->opd_index = idx;
594                 idx = tgt - src;
595         } else {
596                 /* New OSC name fsname-OSTXXXX-osc-MDTXXXX */
597                 if (strncmp(tgt, "-MDT", 4) != 0 &&
598                          strncmp(tgt, "-OST", 4) != 0) {
599                         CERROR("%s: invalid target name %s\n",
600                                m->opd_obd->obd_name, lustre_cfg_string(cfg, 0));
601                         RETURN(-EINVAL);
602                 }
603
604                 if (tgt - src <= 12) {
605                         CERROR("%s: invalid target name %s\n",
606                                m->opd_obd->obd_name, lustre_cfg_string(cfg, 0));
607                         RETURN(-EINVAL);
608                 }
609
610                 if (strncmp(tgt - 12, "-MDT", 4) == 0)
611                         m->opd_connect_mdt = 1;
612
613                 idx = simple_strtol(tgt - 8, &mdt, 16);
614                 if (mdt[0] != '-' || idx > INT_MAX || idx < 0) {
615                         CERROR("%s: invalid OST index in '%s'\n",
616                                m->opd_obd->obd_name, src);
617                         RETURN(-EINVAL);
618                 }
619
620                 m->opd_index = idx;
621                 idx = tgt - src - 12;
622         }
623         /* check the fsname length, and after this everything else will fit */
624         if (idx > MTI_NAME_MAXLEN) {
625                 CERROR("%s: fsname too long in '%s'\n",
626                        m->opd_obd->obd_name, src);
627                 RETURN(-EINVAL);
628         }
629
630         OBD_ALLOC(osdname, MAX_OBD_NAME);
631         if (osdname == NULL)
632                 RETURN(-ENOMEM);
633
634         memcpy(osdname, src, idx); /* copy just the fsname part */
635         osdname[idx] = '\0';
636
637         mdt = strstr(mdt, "-MDT");
638         if (mdt == NULL) /* 1.8 configs don't have "-MDT0000" at the end */
639                 strcat(osdname, "-MDT0000");
640         else
641                 strcat(osdname, mdt);
642         strcat(osdname, "-osd");
643         CDEBUG(D_HA, "%s: connect to %s (%s)\n", obd->obd_name, osdname, src);
644
645         if (m->opd_connect_mdt) {
646                 struct client_obd *cli = &m->opd_obd->u.cli;
647
648                 OBD_ALLOC(cli->cl_rpc_lock, sizeof(*cli->cl_rpc_lock));
649                 if (!cli->cl_rpc_lock)
650                         RETURN(-ENOMEM);
651                 osp_init_rpc_lock(cli->cl_rpc_lock);
652         }
653
654         m->opd_dt_dev.dd_lu_dev.ld_ops = &osp_lu_ops;
655         m->opd_dt_dev.dd_ops = &osp_dt_ops;
656         obd->obd_lu_dev = &m->opd_dt_dev.dd_lu_dev;
657
658         rc = osp_connect_to_osd(env, m, osdname);
659         if (rc)
660                 GOTO(out_fini, rc);
661
662         rc = ptlrpcd_addref();
663         if (rc)
664                 GOTO(out_disconnect, rc);
665
666         rc = client_obd_setup(obd, cfg);
667         if (rc) {
668                 CERROR("%s: can't setup obd: %d\n", m->opd_obd->obd_name, rc);
669                 GOTO(out_ref, rc);
670         }
671
672         osp_lprocfs_init(m);
673
674         if (!m->opd_connect_mdt) {
675                 /* Initialize last id from the storage - will be
676                  * used in orphan cleanup. */
677                 rc = osp_last_used_init(env, m);
678                 if (rc)
679                         GOTO(out_proc, rc);
680                 /* Initialize precreation thread, it handles new
681                  * connections as well. */
682                 rc = osp_init_precreate(m);
683                 if (rc)
684                         GOTO(out_last_used, rc);
685                 /*
686                  * Initialize synhronization mechanism taking
687                  * care of propogating changes to OST in near
688                  * transactional manner.
689                  */
690                 rc = osp_sync_init(env, m);
691                 if (rc)
692                         GOTO(out_precreat, rc);
693
694                 rc = obd_fid_init(m->opd_obd, NULL, LUSTRE_SEQ_DATA);
695                 if (rc) {
696                         CERROR("%s: fid init error: rc = %d\n",
697                                m->opd_obd->obd_name, rc);
698                         GOTO(out, rc);
699                 }
700         }
701         /*
702          * Initiate connect to OST
703          */
704         ll_generate_random_uuid(uuid);
705         class_uuid_unparse(uuid, &m->opd_cluuid);
706
707         imp = obd->u.cli.cl_import;
708
709         rc = ptlrpc_init_import(imp);
710         if (rc)
711                 GOTO(out, rc);
712         if (osdname)
713                 OBD_FREE(osdname, MAX_OBD_NAME);
714         RETURN(0);
715
716 out:
717         if (!m->opd_connect_mdt)
718                 /* stop sync thread */
719                 osp_sync_fini(m);
720 out_precreat:
721         /* stop precreate thread */
722         if (!m->opd_connect_mdt)
723                 osp_precreate_fini(m);
724 out_last_used:
725         osp_last_used_fini(env, m);
726 out_proc:
727         ptlrpc_lprocfs_unregister_obd(obd);
728         lprocfs_obd_cleanup(obd);
729         class_destroy_import(obd->u.cli.cl_import);
730         client_obd_cleanup(obd);
731 out_ref:
732         ptlrpcd_decref();
733 out_disconnect:
734         if (m->opd_connect_mdt) {
735                 struct client_obd *cli = &m->opd_obd->u.cli;
736                 if (cli->cl_rpc_lock != NULL) {
737                         OBD_FREE_PTR(cli->cl_rpc_lock);
738                         cli->cl_rpc_lock = NULL;
739                 }
740         }
741         obd_disconnect(m->opd_storage_exp);
742 out_fini:
743         if (osdname)
744                 OBD_FREE(osdname, MAX_OBD_NAME);
745         RETURN(rc);
746 }
747
748 static struct lu_device *osp_device_free(const struct lu_env *env,
749                                          struct lu_device *lu)
750 {
751         struct osp_device *m = lu2osp_dev(lu);
752
753         ENTRY;
754
755         if (cfs_atomic_read(&lu->ld_ref) && lu->ld_site) {
756                 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
757                 lu_site_print(env, lu->ld_site, &msgdata, lu_cdebug_printer);
758         }
759         dt_device_fini(&m->opd_dt_dev);
760         OBD_FREE_PTR(m);
761         RETURN(NULL);
762 }
763
764 static struct lu_device *osp_device_alloc(const struct lu_env *env,
765                                           struct lu_device_type *t,
766                                           struct lustre_cfg *lcfg)
767 {
768         struct osp_device *m;
769         struct lu_device  *l;
770
771         OBD_ALLOC_PTR(m);
772         if (m == NULL) {
773                 l = ERR_PTR(-ENOMEM);
774         } else {
775                 int rc;
776
777                 l = osp2lu_dev(m);
778                 dt_device_init(&m->opd_dt_dev, t);
779                 if (is_osp_for_connection(lustre_cfg_string(lcfg, 0)))
780                         rc = osp_init_for_ost(env, m, t, lcfg);
781                 else
782                         rc = osp_init0(env, m, t, lcfg);
783                 if (rc != 0) {
784                         osp_device_free(env, l);
785                         l = ERR_PTR(rc);
786                 }
787         }
788         return l;
789 }
790
791 static struct lu_device *osp_device_fini(const struct lu_env *env,
792                                          struct lu_device *d)
793 {
794         struct osp_device *m = lu2osp_dev(d);
795         struct obd_import *imp;
796         int                rc;
797
798         ENTRY;
799
800         if (m->opd_storage_exp)
801                 obd_disconnect(m->opd_storage_exp);
802
803         if (is_osp_for_connection(m->opd_obd->obd_name))
804                 osp_fini_for_ost(m);
805
806         imp = m->opd_obd->u.cli.cl_import;
807
808         if (imp->imp_rq_pool) {
809                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
810                 imp->imp_rq_pool = NULL;
811         }
812
813         obd_cleanup_client_import(m->opd_obd);
814
815         if (m->opd_symlink)
816                 lprocfs_remove(&m->opd_symlink);
817
818         LASSERT(m->opd_obd);
819         ptlrpc_lprocfs_unregister_obd(m->opd_obd);
820         lprocfs_obd_cleanup(m->opd_obd);
821
822         if (m->opd_connect_mdt) {
823                 struct client_obd *cli = &m->opd_obd->u.cli;
824                 if (cli->cl_rpc_lock != NULL) {
825                         OBD_FREE_PTR(cli->cl_rpc_lock);
826                         cli->cl_rpc_lock = NULL;
827                 }
828         }
829
830         rc = client_obd_cleanup(m->opd_obd);
831         LASSERTF(rc == 0, "error %d\n", rc);
832
833         ptlrpcd_decref();
834
835         RETURN(NULL);
836 }
837
838 static int osp_reconnect(const struct lu_env *env,
839                          struct obd_export *exp, struct obd_device *obd,
840                          struct obd_uuid *cluuid,
841                          struct obd_connect_data *data,
842                          void *localdata)
843 {
844         return 0;
845 }
846
847 /*
848  * we use exports to track all LOD users
849  */
850 static int osp_obd_connect(const struct lu_env *env, struct obd_export **exp,
851                            struct obd_device *obd, struct obd_uuid *cluuid,
852                            struct obd_connect_data *data, void *localdata)
853 {
854         struct osp_device       *osp = lu2osp_dev(obd->obd_lu_dev);
855         struct obd_connect_data *ocd;
856         struct obd_import       *imp;
857         struct lustre_handle     conn;
858         int                      rc;
859
860         ENTRY;
861
862         CDEBUG(D_CONFIG, "connect #%d\n", osp->opd_connects);
863
864         rc = class_connect(&conn, obd, cluuid);
865         if (rc)
866                 RETURN(rc);
867
868         *exp = class_conn2export(&conn);
869         osp->opd_exp = *exp;
870
871         /* Why should there ever be more than 1 connect? */
872         osp->opd_connects++;
873         LASSERT(osp->opd_connects == 1);
874
875         osp->opd_exp = *exp;
876
877         imp = osp->opd_obd->u.cli.cl_import;
878         imp->imp_dlm_handle = conn;
879
880         LASSERT(data != NULL);
881         LASSERT(data->ocd_connect_flags & OBD_CONNECT_INDEX);
882         ocd = &imp->imp_connect_data;
883         *ocd = *data;
884         if (is_osp_for_connection(osp->opd_obd->obd_name))
885                 ocd->ocd_connect_flags |= OBD_CONNECT_LIGHTWEIGHT;
886
887         imp->imp_connect_flags_orig = ocd->ocd_connect_flags;
888
889         ocd->ocd_version = LUSTRE_VERSION_CODE;
890         ocd->ocd_index = data->ocd_index;
891         imp->imp_connect_flags_orig = ocd->ocd_connect_flags;
892
893         rc = ptlrpc_connect_import(imp);
894         if (rc) {
895                 CERROR("%s: can't connect obd: rc = %d\n", obd->obd_name, rc);
896                 GOTO(out, rc);
897         }
898
899         ptlrpc_pinger_add_import(imp);
900
901         if (osp->opd_connect_mdt && data->ocd_index == 0 &&
902             !is_osp_for_connection(obd->obd_name)) {
903                 struct seq_server_site *ss;
904
905                 ss = lu_site2seq(osp2lu_dev(osp)->ld_site);
906                 ss->ss_control_exp = class_export_get(*exp);
907                 ss->ss_server_fld->lsf_control_exp = *exp;
908         }
909
910 out:
911         RETURN(rc);
912 }
913
914 /*
915  * once last export (we don't count self-export) disappeared
916  * osp can be released
917  */
918 static int osp_obd_disconnect(struct obd_export *exp)
919 {
920         struct obd_device *obd = exp->exp_obd;
921         struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
922         int                rc;
923         ENTRY;
924
925         /* Only disconnect the underlying layers on the final disconnect. */
926         LASSERT(osp->opd_connects == 1);
927         osp->opd_connects--;
928
929         rc = class_disconnect(exp);
930         if (rc) {
931                 CERROR("%s: class disconnect error: rc = %d\n",
932                        obd->obd_name, rc);
933                 RETURN(rc);
934         }
935
936         /* destroy the device */
937         if (!is_osp_for_connection(obd->obd_name))
938                 class_manual_cleanup(obd);
939
940         RETURN(rc);
941 }
942
943 /*
944  * lprocfs helpers still use OBD API, let's keep obd_statfs() support
945  */
946 static int osp_obd_statfs(const struct lu_env *env, struct obd_export *exp,
947                           struct obd_statfs *osfs, __u64 max_age, __u32 flags)
948 {
949         struct obd_statfs       *msfs;
950         struct ptlrpc_request   *req;
951         struct obd_import       *imp = NULL;
952         int                      rc;
953
954         ENTRY;
955
956         /* Since the request might also come from lprocfs, so we need
957          * sync this with client_disconnect_export Bug15684 */
958         down_read(&exp->exp_obd->u.cli.cl_sem);
959         if (exp->exp_obd->u.cli.cl_import)
960                 imp = class_import_get(exp->exp_obd->u.cli.cl_import);
961         up_read(&exp->exp_obd->u.cli.cl_sem);
962         if (!imp)
963                 RETURN(-ENODEV);
964
965         /* We could possibly pass max_age in the request (as an absolute
966          * timestamp or a "seconds.usec ago") so the target can avoid doing
967          * extra calls into the filesystem if that isn't necessary (e.g.
968          * during mount that would help a bit).  Having relative timestamps
969          * is not so great if request processing is slow, while absolute
970          * timestamps are not ideal because they need time synchronization. */
971         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
972
973         class_import_put(imp);
974
975         if (req == NULL)
976                 RETURN(-ENOMEM);
977
978         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
979         if (rc) {
980                 ptlrpc_request_free(req);
981                 RETURN(rc);
982         }
983         ptlrpc_request_set_replen(req);
984         req->rq_request_portal = OST_CREATE_PORTAL;
985         ptlrpc_at_set_req_timeout(req);
986
987         if (flags & OBD_STATFS_NODELAY) {
988                 /* procfs requests not want stat in wait for avoid deadlock */
989                 req->rq_no_resend = 1;
990                 req->rq_no_delay = 1;
991         }
992
993         rc = ptlrpc_queue_wait(req);
994         if (rc)
995                 GOTO(out, rc);
996
997         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
998         if (msfs == NULL)
999                 GOTO(out, rc = -EPROTO);
1000
1001         *osfs = *msfs;
1002
1003         EXIT;
1004 out:
1005         ptlrpc_req_finished(req);
1006         return rc;
1007 }
1008
1009 static int osp_import_event(struct obd_device *obd, struct obd_import *imp,
1010                             enum obd_import_event event)
1011 {
1012         struct osp_device *d = lu2osp_dev(obd->obd_lu_dev);
1013
1014         switch (event) {
1015         case IMP_EVENT_DISCON:
1016                 d->opd_got_disconnected = 1;
1017                 d->opd_imp_connected = 0;
1018                 if (d->opd_connect_mdt)
1019                         break;
1020                 osp_pre_update_status(d, -ENODEV);
1021                 cfs_waitq_signal(&d->opd_pre_waitq);
1022                 CDEBUG(D_HA, "got disconnected\n");
1023                 break;
1024         case IMP_EVENT_INACTIVE:
1025                 d->opd_imp_active = 0;
1026                 if (d->opd_connect_mdt)
1027                         break;
1028                 osp_pre_update_status(d, -ENODEV);
1029                 cfs_waitq_signal(&d->opd_pre_waitq);
1030                 CDEBUG(D_HA, "got inactive\n");
1031                 break;
1032         case IMP_EVENT_ACTIVE:
1033                 d->opd_imp_active = 1;
1034                 if (d->opd_got_disconnected)
1035                         d->opd_new_connection = 1;
1036                 d->opd_imp_connected = 1;
1037                 d->opd_imp_seen_connected = 1;
1038                 if (d->opd_connect_mdt)
1039                         break;
1040                 cfs_waitq_signal(&d->opd_pre_waitq);
1041                 __osp_sync_check_for_work(d);
1042                 CDEBUG(D_HA, "got connected\n");
1043                 break;
1044         case IMP_EVENT_INVALIDATE:
1045                 if (obd->obd_namespace == NULL)
1046                         break;
1047                 ldlm_namespace_cleanup(obd->obd_namespace, LDLM_FL_LOCAL_ONLY);
1048                 break;
1049         case IMP_EVENT_OCD:
1050         case IMP_EVENT_DEACTIVATE:
1051         case IMP_EVENT_ACTIVATE:
1052                 break;
1053         default:
1054                 CERROR("%s: unsupported import event: %#x\n",
1055                        obd->obd_name, event);
1056         }
1057         return 0;
1058 }
1059
1060 static int osp_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1061                          void *karg, void *uarg)
1062 {
1063         struct obd_device       *obd = exp->exp_obd;
1064         struct osp_device       *d;
1065         struct obd_ioctl_data   *data = karg;
1066         int                      rc = 0;
1067
1068         ENTRY;
1069
1070         LASSERT(obd->obd_lu_dev);
1071         d = lu2osp_dev(obd->obd_lu_dev);
1072         LASSERT(d->opd_dt_dev.dd_ops == &osp_dt_ops);
1073
1074         if (!cfs_try_module_get(THIS_MODULE)) {
1075                 CERROR("%s: can't get module. Is it alive?", obd->obd_name);
1076                 return -EINVAL;
1077         }
1078
1079         switch (cmd) {
1080         case OBD_IOC_CLIENT_RECOVER:
1081                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
1082                                            data->ioc_inlbuf1, 0);
1083                 if (rc > 0)
1084                         rc = 0;
1085                 break;
1086         case IOC_OSC_SET_ACTIVE:
1087                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
1088                                               data->ioc_offset);
1089                 break;
1090         case OBD_IOC_PING_TARGET:
1091                 rc = ptlrpc_obd_ping(obd);
1092                 break;
1093         default:
1094                 CERROR("%s: unrecognized ioctl %#x by %s\n", obd->obd_name,
1095                        cmd, cfs_curproc_comm());
1096                 rc = -ENOTTY;
1097         }
1098         cfs_module_put(THIS_MODULE);
1099         return rc;
1100 }
1101
1102 static int osp_obd_health_check(const struct lu_env *env,
1103                                 struct obd_device *obd)
1104 {
1105         struct osp_device *d = lu2osp_dev(obd->obd_lu_dev);
1106
1107         ENTRY;
1108
1109         /*
1110          * 1.8/2.0 behaviour is that OST being connected once at least
1111          * is considired "healthy". and one "healty" OST is enough to
1112          * allow lustre clients to connect to MDS
1113          */
1114         LASSERT(d);
1115         RETURN(!d->opd_imp_seen_connected);
1116 }
1117
1118 /* context key constructor/destructor: mdt_key_init, mdt_key_fini */
1119 LU_KEY_INIT_FINI(osp, struct osp_thread_info);
1120 static void osp_key_exit(const struct lu_context *ctx,
1121                          struct lu_context_key *key, void *data)
1122 {
1123         struct osp_thread_info *info = data;
1124
1125         info->osi_attr.la_valid = 0;
1126 }
1127
1128 struct lu_context_key osp_thread_key = {
1129         .lct_tags = LCT_MD_THREAD,
1130         .lct_init = osp_key_init,
1131         .lct_fini = osp_key_fini,
1132         .lct_exit = osp_key_exit
1133 };
1134
1135 /* context key constructor/destructor: mdt_txn_key_init, mdt_txn_key_fini */
1136 LU_KEY_INIT_FINI(osp_txn, struct osp_txn_info);
1137
1138 struct lu_context_key osp_txn_key = {
1139         .lct_tags = LCT_OSP_THREAD,
1140         .lct_init = osp_txn_key_init,
1141         .lct_fini = osp_txn_key_fini
1142 };
1143 LU_TYPE_INIT_FINI(osp, &osp_thread_key, &osp_txn_key);
1144
1145 static struct lu_device_type_operations osp_device_type_ops = {
1146         .ldto_init           = osp_type_init,
1147         .ldto_fini           = osp_type_fini,
1148
1149         .ldto_start          = osp_type_start,
1150         .ldto_stop           = osp_type_stop,
1151
1152         .ldto_device_alloc   = osp_device_alloc,
1153         .ldto_device_free    = osp_device_free,
1154
1155         .ldto_device_fini    = osp_device_fini
1156 };
1157
1158 static struct lu_device_type osp_device_type = {
1159         .ldt_tags     = LU_DEVICE_DT,
1160         .ldt_name     = LUSTRE_OSP_NAME,
1161         .ldt_ops      = &osp_device_type_ops,
1162         .ldt_ctx_tags = LCT_MD_THREAD
1163 };
1164
1165 static struct obd_ops osp_obd_device_ops = {
1166         .o_owner        = THIS_MODULE,
1167         .o_add_conn     = client_import_add_conn,
1168         .o_del_conn     = client_import_del_conn,
1169         .o_reconnect    = osp_reconnect,
1170         .o_connect      = osp_obd_connect,
1171         .o_disconnect   = osp_obd_disconnect,
1172         .o_health_check = osp_obd_health_check,
1173         .o_import_event = osp_import_event,
1174         .o_iocontrol    = osp_iocontrol,
1175         .o_statfs       = osp_obd_statfs,
1176         .o_fid_init     = client_fid_init,
1177         .o_fid_fini     = client_fid_fini,
1178 };
1179
1180 struct llog_operations osp_mds_ost_orig_logops;
1181
1182 static int __init osp_mod_init(void)
1183 {
1184         struct lprocfs_static_vars       lvars;
1185         cfs_proc_dir_entry_t            *osc_proc_dir;
1186         int                              rc;
1187
1188         rc = lu_kmem_init(osp_caches);
1189         if (rc)
1190                 return rc;
1191
1192         lprocfs_osp_init_vars(&lvars);
1193
1194         rc = class_register_type(&osp_obd_device_ops, NULL, lvars.module_vars,
1195                                  LUSTRE_OSP_NAME, &osp_device_type);
1196
1197         /* create "osc" entry in procfs for compatibility purposes */
1198         if (rc != 0) {
1199                 lu_kmem_fini(osp_caches);
1200                 return rc;
1201         }
1202
1203         /* Note: add_rec/delcare_add_rec will be only used by catalogs */
1204         osp_mds_ost_orig_logops = llog_osd_ops;
1205         osp_mds_ost_orig_logops.lop_add = llog_cat_add_rec;
1206         osp_mds_ost_orig_logops.lop_declare_add = llog_cat_declare_add_rec;
1207
1208         osc_proc_dir = lprocfs_srch(proc_lustre_root, "osc");
1209         if (osc_proc_dir == NULL) {
1210                 osc_proc_dir = lprocfs_register("osc", proc_lustre_root, NULL,
1211                                                 NULL);
1212                 if (IS_ERR(osc_proc_dir))
1213                         CERROR("osp: can't create compat entry \"osc\": %d\n",
1214                                (int) PTR_ERR(osc_proc_dir));
1215         }
1216         return rc;
1217 }
1218
1219 static void __exit osp_mod_exit(void)
1220 {
1221         lprocfs_try_remove_proc_entry("osc", proc_lustre_root);
1222
1223         class_unregister_type(LUSTRE_OSP_NAME);
1224         lu_kmem_fini(osp_caches);
1225 }
1226
1227 MODULE_AUTHOR("Intel, Inc. <http://www.intel.com/>");
1228 MODULE_DESCRIPTION("Lustre OST Proxy Device ("LUSTRE_OSP_NAME")");
1229 MODULE_LICENSE("GPL");
1230
1231 cfs_module(osp, LUSTRE_VERSION_STRING, osp_mod_init, osp_mod_exit);