Whamcloud - gitweb
LU-5508 osp: RPC adjustment for remote transaction
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * OST/MDT proxy device (OSP) Metadata methods
29  *
30  * This file implements methods for remote MD object, which include
31  * dt_object_operations, dt_index_operations and dt_body_operations.
32  *
33  * If there are multiple MDTs in one filesystem, one operation might
34  * include modifications in several MDTs. In such cases, clients
35  * send the RPC to the master MDT, then the operation is decomposed into
36  * object updates which will be dispatched to OSD or OSP. The local updates
37  * go to local OSD and the remote updates go to OSP. In OSP, these remote
38  * object updates will be packed into an update RPC, sent to the remote MDT
39  * and handled by Object Update Target (OUT).
40  *
41  * In DNE phase I, because of missing complete recovery solution, updates
42  * will be executed in order and synchronously.
43  *     1. The transaction is created.
44  *     2. In transaction declare, it collects and packs remote
45  *        updates (in osp_md_declare_xxx()).
46  *     3. In transaction start, it sends these remote updates
47  *        to remote MDTs, which will execute these updates synchronously.
48  *     4. In transaction execute phase, the local updates will be executed
49  *        synchronously.
50  *
51  * Author: Di Wang <di.wang@intel.com>
52  */
53
54 #define DEBUG_SUBSYSTEM S_MDS
55
56 #include <lustre_log.h>
57 #include "osp_internal.h"
58
59 static const char dot[] = ".";
60 static const char dotdot[] = "..";
61
62 /**
63  * Implementation of dt_object_operations::do_declare_create
64  *
65  * Insert object create update into the RPC, which will be sent during
66  * transaction start. Note: if the object has already been created,
67  * we must add object destroy updates ahead of create updates, so it will
68  * destroy then recreate the object.
69  *
70  * \param[in] env       execution environment
71  * \param[in] dt        remote object to be created
72  * \param[in] attr      attribute of the created object
73  * \param[in] hint      creation hint
74  * \param[in] dof       creation format information
75  * \param[in] th        the transaction handle
76  *
77  * \retval              0 if the insertion succeeds.
78  * \retval              negative errno if the insertion fails.
79  */
80 int osp_md_declare_object_create(const struct lu_env *env,
81                                  struct dt_object *dt,
82                                  struct lu_attr *attr,
83                                  struct dt_allocation_hint *hint,
84                                  struct dt_object_format *dof,
85                                  struct thandle *th)
86 {
87         struct osp_thread_info          *osi = osp_env_info(env);
88         struct dt_update_request        *update;
89         struct lu_fid                   *fid1;
90         int                             sizes[2] = {sizeof(struct obdo), 0};
91         char                            *bufs[2] = {NULL, NULL};
92         int                             buf_count;
93         int                             rc;
94
95         update = out_find_create_update_loc(th, dt);
96         if (IS_ERR(update)) {
97                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
98                        dt->do_lu.lo_dev->ld_obd->obd_name,
99                        (int)PTR_ERR(update));
100                 return PTR_ERR(update);
101         }
102
103         osi->osi_obdo.o_valid = 0;
104         obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
105         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
106
107         bufs[0] = (char *)&osi->osi_obdo;
108         buf_count = 1;
109         fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
110         if (hint != NULL && hint->dah_parent) {
111                 struct lu_fid *fid2;
112
113                 fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
114                 sizes[1] = sizeof(*fid2);
115                 bufs[1] = (char *)fid2;
116                 buf_count++;
117         }
118
119         if (lu_object_exists(&dt->do_lu)) {
120                 /* If the object already exists, we needs to destroy
121                  * this orphan object first.
122                  *
123                  * The scenario might happen in this case
124                  *
125                  * 1. client send remote create to MDT0.
126                  * 2. MDT0 send create update to MDT1.
127                  * 3. MDT1 finished create synchronously.
128                  * 4. MDT0 failed and reboot.
129                  * 5. client resend remote create to MDT0.
130                  * 6. MDT0 tries to resend create update to MDT1,
131                  *    but find the object already exists
132                  */
133                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
134                        dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
135
136                 rc = out_insert_update(env, update, OUT_REF_DEL, fid1, 0,
137                                        NULL, NULL);
138                 if (rc != 0)
139                         GOTO(out, rc);
140
141                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
142                         /* decrease for ".." */
143                         rc = out_insert_update(env, update, OUT_REF_DEL, fid1,
144                                                0, NULL, NULL);
145                         if (rc != 0)
146                                 GOTO(out, rc);
147                 }
148
149                 rc = out_insert_update(env, update, OUT_DESTROY, fid1, 0, NULL,
150                                        NULL);
151                 if (rc != 0)
152                         GOTO(out, rc);
153
154                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
155                 /* Increase batchid to add this orphan object deletion
156                  * to separate transaction */
157                 update_inc_batchid(update);
158         }
159
160         rc = out_insert_update(env, update, OUT_CREATE, fid1, buf_count, sizes,
161                                (const char **)bufs);
162 out:
163         if (rc)
164                 CERROR("%s: Insert update error: rc = %d\n",
165                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
166
167         return rc;
168 }
169
170 /**
171  * Implementation of dt_object_operations::do_create
172  *
173  * It sets necessary flags for created object. In DNE phase I,
174  * remote updates are actually executed during transaction start,
175  * i.e. the object has already been created when calling this method.
176  *
177  * \param[in] env       execution environment
178  * \param[in] dt        object to be created
179  * \param[in] attr      attribute of the created object
180  * \param[in] hint      creation hint
181  * \param[in] dof       creation format information
182  * \param[in] th        the transaction handle
183  *
184  * \retval              only return 0 for now
185  */
186 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
187                          struct lu_attr *attr, struct dt_allocation_hint *hint,
188                          struct dt_object_format *dof, struct thandle *th)
189 {
190         CDEBUG(D_INFO, "create object "DFID"\n",
191                PFID(&dt->do_lu.lo_header->loh_fid));
192
193         /* Because the create update RPC will be sent during declare phase,
194          * if creation reaches here, it means the object has been created
195          * successfully */
196         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
197         dt2osp_obj(dt)->opo_non_exist = 0;
198
199         return 0;
200 }
201
202 /**
203  * Implementation of dt_object_operations::do_declare_ref_del
204  *
205  * Declare decreasing the reference count of the remote object, i.e. insert
206  * decreasing object reference count update into the RPC, which will be sent
207  * during transaction start.
208  *
209  * \param[in] env       execution environment
210  * \param[in] dt        object to decrease the reference count.
211  * \param[in] th        the transaction handle of refcount decrease.
212  *
213  * \retval              0 if the insertion succeeds.
214  * \retval              negative errno if the insertion fails.
215  */
216 static int osp_md_declare_object_ref_del(const struct lu_env *env,
217                                          struct dt_object *dt,
218                                          struct thandle *th)
219 {
220         struct dt_update_request        *update;
221         struct lu_fid                   *fid;
222         int                             rc;
223
224         update = out_find_create_update_loc(th, dt);
225         if (IS_ERR(update)) {
226                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
227                        dt->do_lu.lo_dev->ld_obd->obd_name,
228                       (int)PTR_ERR(update));
229                 return PTR_ERR(update);
230         }
231
232         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
233
234         rc = out_insert_update(env, update, OUT_REF_DEL, fid, 0, NULL, NULL);
235
236         return rc;
237 }
238
239 /**
240  * Implementation of dt_object_operations::do_ref_del
241  *
242  * Do nothing in this method for now. In DNE phase I, remote updates are
243  * actually executed during transaction start, i.e. the object reference
244  * count has already been decreased when calling this method.
245  *
246  * \param[in] env       execution environment
247  * \param[in] dt        object to decrease the reference count
248  * \param[in] th        the transaction handle
249  *
250  * \retval              only return 0 for now
251  */
252 static int osp_md_object_ref_del(const struct lu_env *env,
253                                  struct dt_object *dt,
254                                  struct thandle *th)
255 {
256         CDEBUG(D_INFO, "ref del object "DFID"\n",
257                PFID(&dt->do_lu.lo_header->loh_fid));
258
259         return 0;
260 }
261
262 /**
263  * Implementation of dt_object_operations::do_declare_ref_del
264  *
265  * Declare increasing the reference count of the remote object,
266  * i.e. insert increasing object reference count update into RPC.
267  *
268  * \param[in] env       execution environment
269  * \param[in] dt        object on which to increase the reference count.
270  * \param[in] th        the transaction handle.
271  *
272  * \retval              0 if the insertion succeeds.
273  * \retval              negative errno if the insertion fails.
274  */
275 static int osp_md_declare_ref_add(const struct lu_env *env,
276                                   struct dt_object *dt, struct thandle *th)
277 {
278         struct dt_update_request        *update;
279         struct lu_fid                   *fid;
280         int                             rc;
281
282         update = out_find_create_update_loc(th, dt);
283         if (IS_ERR(update)) {
284                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
285                        dt->do_lu.lo_dev->ld_obd->obd_name,
286                        (int)PTR_ERR(update));
287                 return PTR_ERR(update);
288         }
289
290         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
291
292         rc = out_insert_update(env, update, OUT_REF_ADD, fid, 0, NULL, NULL);
293
294         return rc;
295 }
296
297 /**
298  * Implementation of dt_object_operations::do_ref_add
299  *
300  * Do nothing in this method for now. In DNE phase I, remote updates are
301  * actually executed during transaction start, i.e. the object reference
302  * count has already been increased when calling this method.
303  *
304  * \param[in] env       execution environment
305  * \param[in] dt        object on which to increase the reference count
306  * \param[in] th        the transaction handle
307  *
308  * \retval              only return 0 for now
309  */
310 static int osp_md_object_ref_add(const struct lu_env *env, struct dt_object *dt,
311                                  struct thandle *th)
312 {
313         CDEBUG(D_INFO, "ref add object "DFID"\n",
314                PFID(&dt->do_lu.lo_header->loh_fid));
315
316         return 0;
317 }
318
319 /**
320  * Implementation of dt_object_operations::do_ah_init
321  *
322  * Initialize the allocation hint for object creation, which is usually called
323  * before the creation, and these hints (parent and child mode) will be sent to
324  * the remote Object Update Target (OUT) and used in the object create process,
325  * same as OSD object creation.
326  *
327  * \param[in] env       execution environment
328  * \param[in] ah        the hint to be initialized
329  * \param[in] parent    the parent of the object
330  * \param[in] child     the object to be created
331  * \param[in] child_mode the mode of the created object
332  */
333 static void osp_md_ah_init(const struct lu_env *env,
334                            struct dt_allocation_hint *ah,
335                            struct dt_object *parent,
336                            struct dt_object *child,
337                            umode_t child_mode)
338 {
339         LASSERT(ah);
340
341         ah->dah_parent = parent;
342         ah->dah_mode = child_mode;
343 }
344
345 /**
346  * Implementation of dt_object_operations::do_declare_attr_get
347  *
348  * Declare setting attributes of the remote object, i.e. insert remote
349  * object attr_set update into RPC.
350  *
351  * \param[in] env       execution environment
352  * \param[in] dt        object on which to set attributes
353  * \param[in] attr      attributes to be set
354  * \param[in] th        the transaction handle
355  *
356  * \retval              0 if the insertion succeeds.
357  * \retval              negative errno if the insertion fails.
358  */
359 int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
360                             const struct lu_attr *attr, struct thandle *th)
361 {
362         struct osp_thread_info          *osi = osp_env_info(env);
363         struct dt_update_request        *update;
364         struct lu_fid                   *fid;
365         int                             size = sizeof(struct obdo);
366         char                            *buf;
367         int                             rc;
368
369         update = out_find_create_update_loc(th, dt);
370         if (IS_ERR(update)) {
371                 CERROR("%s: Get OSP update buf failed: %d\n",
372                        dt->do_lu.lo_dev->ld_obd->obd_name,
373                        (int)PTR_ERR(update));
374                 return PTR_ERR(update);
375         }
376
377         osi->osi_obdo.o_valid = 0;
378         obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
379                      attr->la_valid);
380         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
381
382         buf = (char *)&osi->osi_obdo;
383         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
384
385         rc = out_insert_update(env, update, OUT_ATTR_SET, fid, 1, &size,
386                                (const char **)&buf);
387
388         return rc;
389 }
390
391 /**
392  * Implementation of dt_object_operations::do_attr_set
393  *
394  * Do nothing in this method for now. In DNE phase I, remote updates
395  * are actually executed during transaction start, i.e. object attributes
396  * have already been set when calling this method.
397  *
398  * \param[in] env       execution environment
399  * \param[in] dt        object to set attributes
400  * \param[in] attr      attributes to be set
401  * \param[in] th        the transaction handle
402  * \param[in] capa      capability of setting attributes (not yet implemented).
403  *
404  * \retval              only return 0 for now
405  */
406 int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
407                     const struct lu_attr *attr, struct thandle *th,
408                     struct lustre_capa *capa)
409 {
410         CDEBUG(D_INFO, "attr set object "DFID"\n",
411                PFID(&dt->do_lu.lo_header->loh_fid));
412
413         RETURN(0);
414 }
415
416 /**
417  * Implementation of dt_object_operations::do_read_lock
418  *
419  * osp_md_object_{read,write}_lock() will only lock the remote object in the
420  * local cache, which uses the semaphore (opo_sem) inside the osp_object to
421  * lock the object. Note: it will not lock the object in the whole cluster,
422  * which relies on the LDLM lock.
423  *
424  * \param[in] env       execution environment
425  * \param[in] dt        object to be locked
426  * \param[in] role      lock role from MDD layer, see mdd_object_role().
427  */
428 static void osp_md_object_read_lock(const struct lu_env *env,
429                                     struct dt_object *dt, unsigned role)
430 {
431         struct osp_object  *obj = dt2osp_obj(dt);
432
433         LASSERT(obj->opo_owner != env);
434         down_read_nested(&obj->opo_sem, role);
435
436         LASSERT(obj->opo_owner == NULL);
437 }
438
439 /**
440  * Implementation of dt_object_operations::do_write_lock
441  *
442  * Lock the remote object in write mode.
443  *
444  * \param[in] env       execution environment
445  * \param[in] dt        object to be locked
446  * \param[in] role      lock role from MDD layer, see mdd_object_role().
447  */
448 static void osp_md_object_write_lock(const struct lu_env *env,
449                                      struct dt_object *dt, unsigned role)
450 {
451         struct osp_object *obj = dt2osp_obj(dt);
452
453         down_write_nested(&obj->opo_sem, role);
454
455         LASSERT(obj->opo_owner == NULL);
456         obj->opo_owner = env;
457 }
458
459 /**
460  * Implementation of dt_object_operations::do_read_unlock
461  *
462  * Unlock the read lock of remote object.
463  *
464  * \param[in] env       execution environment
465  * \param[in] dt        object to be unlocked
466  */
467 static void osp_md_object_read_unlock(const struct lu_env *env,
468                                       struct dt_object *dt)
469 {
470         struct osp_object *obj = dt2osp_obj(dt);
471
472         up_read(&obj->opo_sem);
473 }
474
475 /**
476  * Implementation of dt_object_operations::do_write_unlock
477  *
478  * Unlock the write lock of remote object.
479  *
480  * \param[in] env       execution environment
481  * \param[in] dt        object to be unlocked
482  */
483 static void osp_md_object_write_unlock(const struct lu_env *env,
484                                        struct dt_object *dt)
485 {
486         struct osp_object *obj = dt2osp_obj(dt);
487
488         LASSERT(obj->opo_owner == env);
489         obj->opo_owner = NULL;
490         up_write(&obj->opo_sem);
491 }
492
493 /**
494  * Implementation of dt_object_operations::do_write_locked
495  *
496  * Test if the object is locked in write mode.
497  *
498  * \param[in] env       execution environment
499  * \param[in] dt        object to be tested
500  */
501 static int osp_md_object_write_locked(const struct lu_env *env,
502                                       struct dt_object *dt)
503 {
504         struct osp_object *obj = dt2osp_obj(dt);
505
506         return obj->opo_owner == env;
507 }
508
509 /**
510  * Implementation of dt_index_operations::dio_lookup
511  *
512  * Look up record by key under a remote index object. It packs lookup update
513  * into RPC, sends to the remote OUT and waits for the lookup result.
514  *
515  * \param[in] env       execution environment
516  * \param[in] dt        index object to lookup
517  * \param[out] rec      record in which to return lookup result
518  * \param[in] key       key of index which will be looked up
519  * \param[in] capa      capability of lookup (not yet implemented)
520  *
521  * \retval              1 if the lookup succeeds.
522  * \retval              negative errno if the lookup fails.
523  */
524 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
525                                struct dt_rec *rec, const struct dt_key *key,
526                                struct lustre_capa *capa)
527 {
528         struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
529         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
530         struct dt_device        *dt_dev = &osp->opd_dt_dev;
531         struct dt_update_request   *update;
532         struct object_update_reply *reply;
533         struct ptlrpc_request      *req = NULL;
534         int                        size = strlen((char *)key) + 1;
535         struct lu_fid              *fid;
536         int                        rc;
537         ENTRY;
538
539         /* Because it needs send the update buffer right away,
540          * just create an update buffer, instead of attaching the
541          * update_remote list of the thandle.
542          */
543         update = out_create_update_req(dt_dev);
544         if (IS_ERR(update))
545                 RETURN(PTR_ERR(update));
546
547         rc = out_insert_update(env, update, OUT_INDEX_LOOKUP,
548                                lu_object_fid(&dt->do_lu),
549                                1, &size, (const char **)&key);
550         if (rc) {
551                 CERROR("%s: Insert update error: rc = %d\n",
552                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
553                 GOTO(out, rc);
554         }
555
556         rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
557         if (rc < 0)
558                 GOTO(out, rc);
559
560         reply = req_capsule_server_sized_get(&req->rq_pill,
561                                              &RMF_OUT_UPDATE_REPLY,
562                                              OUT_UPDATE_REPLY_SIZE);
563         if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
564                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
565                        dt_dev->dd_lu_dev.ld_obd->obd_name,
566                        reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
567                 GOTO(out, rc = -EPROTO);
568         }
569
570         rc = object_update_result_data_get(reply, lbuf, 0);
571         if (rc < 0)
572                 GOTO(out, rc);
573
574         if (lbuf->lb_len != sizeof(*fid)) {
575                 CERROR("%s: lookup "DFID" %s wrong size %d\n",
576                        dt_dev->dd_lu_dev.ld_obd->obd_name,
577                        PFID(lu_object_fid(&dt->do_lu)), (char *)key,
578                        (int)lbuf->lb_len);
579                 GOTO(out, rc = -EINVAL);
580         }
581
582         fid = lbuf->lb_buf;
583         if (ptlrpc_rep_need_swab(req))
584                 lustre_swab_lu_fid(fid);
585         if (!fid_is_sane(fid)) {
586                 CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
587                        dt_dev->dd_lu_dev.ld_obd->obd_name,
588                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
589                 GOTO(out, rc = -EINVAL);
590         }
591
592         memcpy(rec, fid, sizeof(*fid));
593
594         GOTO(out, rc = 1);
595
596 out:
597         if (req != NULL)
598                 ptlrpc_req_finished(req);
599
600         out_destroy_update_req(update);
601
602         return rc;
603 }
604
605 /**
606  * Implementation of dt_index_operations::dio_declare_insert
607  *
608  * Declare the index insert of the remote object, i.e. pack index insert update
609  * into the RPC, which will be sent during transaction start.
610  *
611  * \param[in] env       execution environment
612  * \param[in] dt        object for which to insert index
613  * \param[in] rec       record of the index which will be inserted
614  * \param[in] key       key of the index which will be inserted
615  * \param[in] th        the transaction handle
616  *
617  * \retval              0 if the insertion succeeds.
618  * \retval              negative errno if the insertion fails.
619  */
620 static int osp_md_declare_insert(const struct lu_env *env,
621                                  struct dt_object *dt,
622                                  const struct dt_rec *rec,
623                                  const struct dt_key *key,
624                                  struct thandle *th)
625 {
626         struct osp_thread_info     *info = osp_env_info(env);
627         struct dt_update_request   *update;
628         struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
629         struct lu_fid              *fid =
630                                 (struct lu_fid *)lu_object_fid(&dt->do_lu);
631         struct lu_fid              *rec_fid = &info->osi_fid;
632         __u32                       type = cpu_to_le32(rec1->rec_type);
633         int                         size[3] = { strlen((char *)key) + 1,
634                                                 sizeof(*rec_fid),
635                                                 sizeof(type) };
636         const char                 *bufs[3] = { (char *)key,
637                                                 (char *)rec_fid,
638                                                 (char *)&type };
639         int                         rc;
640
641         update = out_find_create_update_loc(th, dt);
642         if (IS_ERR(update)) {
643                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
644                        dt->do_lu.lo_dev->ld_obd->obd_name,
645                        (int)PTR_ERR(update));
646                 return PTR_ERR(update);
647         }
648
649         CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID", %u\n",
650                dt->do_lu.lo_dev->ld_obd->obd_name,
651                PFID(fid), (char *)key, PFID(rec1->rec_fid), rec1->rec_type);
652
653         fid_cpu_to_le(rec_fid, rec1->rec_fid);
654         rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid,
655                                ARRAY_SIZE(size), size, bufs);
656         return rc;
657 }
658
659 /**
660  * Implementation of dt_index_operations::dio_insert
661  *
662  * Do nothing in this method for now. In DNE phase I, remote updates
663  * are actually executed during transaction start, i.e. the index has
664  * already been inserted when calling this method.
665  *
666  * \param[in] env       execution environment
667  * \param[in] dt        object for which to insert index
668  * \param[in] rec       record of the index to be inserted
669  * \param[in] key       key of the index to be inserted
670  * \param[in] th        the transaction handle
671  * \param[in] capa      capability of insert (not yet implemented)
672  * \param[in] ignore_quota quota enforcement for insert
673  *
674  * \retval              only return 0 for now
675  */
676 static int osp_md_index_insert(const struct lu_env *env,
677                                struct dt_object *dt,
678                                const struct dt_rec *rec,
679                                const struct dt_key *key,
680                                struct thandle *th,
681                                struct lustre_capa *capa,
682                                int ignore_quota)
683 {
684         return 0;
685 }
686
687 /**
688  * Implementation of dt_index_operations::dio_declare_delete
689  *
690  * Declare the index delete of the remote object, i.e. insert index delete
691  * update into the RPC, which will be sent during transaction start.
692  *
693  * \param[in] env       execution environment
694  * \param[in] dt        object for which to delete index
695  * \param[in] key       key of the index
696  * \param[in] th        the transaction handle
697  *
698  * \retval              0 if the insertion succeeds.
699  * \retval              negative errno if the insertion fails.
700  */
701 static int osp_md_declare_delete(const struct lu_env *env,
702                                  struct dt_object *dt,
703                                  const struct dt_key *key,
704                                  struct thandle *th)
705 {
706         struct dt_update_request *update;
707         struct lu_fid *fid;
708         int size = strlen((char *)key) + 1;
709         int rc;
710
711         update = out_find_create_update_loc(th, dt);
712         if (IS_ERR(update)) {
713                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
714                        dt->do_lu.lo_dev->ld_obd->obd_name,
715                        (int)PTR_ERR(update));
716                 return PTR_ERR(update);
717         }
718
719         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
720
721         rc = out_insert_update(env, update, OUT_INDEX_DELETE, fid, 1, &size,
722                                (const char **)&key);
723
724         return rc;
725 }
726
727 /**
728  * Implementation of dt_index_operations::dio_delete
729  *
730  * Do nothing in this method for now. Because in DNE phase I, remote updates
731  * are actually executed during transaction start, i.e. the index has already
732  * been deleted when calling this method.
733  *
734  * \param[in] env       execution environment
735  * \param[in] dt        object for which to delete index
736  * \param[in] key       key of the index which will be deleted
737  * \param[in] th        the transaction handle
738  * \param[in] capa      capability of delete (not yet implemented)
739  *
740  * \retval              only return 0 for now
741  */
742 static int osp_md_index_delete(const struct lu_env *env,
743                                struct dt_object *dt,
744                                const struct dt_key *key,
745                                struct thandle *th,
746                                struct lustre_capa *capa)
747 {
748         CDEBUG(D_INFO, "index delete "DFID" %s\n",
749                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
750
751         return 0;
752 }
753
754 /**
755  * Implementation of dt_index_operations::dio_it.next
756  *
757  * Advance the pointer of the iterator to the next entry. It shares a similar
758  * internal implementation with osp_orphan_it_next(), which is being used for
759  * remote orphan index object. This method will be used for remote directory.
760  *
761  * \param[in] env       execution environment
762  * \param[in] di        iterator of this iteration
763  *
764  * \retval              0 if the pointer is advanced successfuly.
765  * \retval              1 if it reaches to the end of the index object.
766  * \retval              negative errno if the pointer cannot be advanced.
767  */
768 int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di)
769 {
770         struct osp_it           *it = (struct osp_it *)di;
771         struct lu_idxpage       *idxpage;
772         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
773         int                     rc;
774         ENTRY;
775
776 again:
777         idxpage = it->ooi_cur_idxpage;
778         if (idxpage != NULL) {
779                 if (idxpage->lip_nr == 0)
780                         RETURN(1);
781
782                 it->ooi_pos_ent++;
783                 if (ent == NULL) {
784                         it->ooi_ent =
785                               (struct lu_dirent *)idxpage->lip_entries;
786                         RETURN(0);
787                 } else if (le16_to_cpu(ent->lde_reclen) != 0 &&
788                            it->ooi_pos_ent < idxpage->lip_nr) {
789                         ent = (struct lu_dirent *)(((char *)ent) +
790                                         le16_to_cpu(ent->lde_reclen));
791                         it->ooi_ent = ent;
792                         RETURN(0);
793                 } else {
794                         it->ooi_ent = NULL;
795                 }
796         }
797
798         rc = osp_it_next_page(env, di);
799         if (rc == 0)
800                 goto again;
801
802         RETURN(rc);
803 }
804
805 /**
806  * Implementation of dt_index_operations::dio_it.key
807  *
808  * Get the key at current iterator poisiton. These iteration methods
809  * (dio_it) will only be used for iterating the remote directory, so
810  * the key is the name of the directory entry.
811  *
812  * \param[in] env       execution environment
813  * \param[in] di        iterator of this iteration
814  *
815  * \retval              name of the current entry
816  */
817 static struct dt_key *osp_it_key(const struct lu_env *env,
818                                  const struct dt_it *di)
819 {
820         struct osp_it           *it = (struct osp_it *)di;
821         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
822
823         return (struct dt_key *)ent->lde_name;
824 }
825
826 /**
827  * Implementation of dt_index_operations::dio_it.key_size
828  *
829  * Get the key size at current iterator poisiton. These iteration methods
830  * (dio_it) will only be used for iterating the remote directory, so the key
831  * size is the name size of the directory entry.
832  *
833  * \param[in] env       execution environment
834  * \param[in] di        iterator of this iteration
835  *
836  * \retval              name size of the current entry
837  */
838
839 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
840 {
841         struct osp_it           *it = (struct osp_it *)di;
842         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
843
844         return (int)le16_to_cpu(ent->lde_namelen);
845 }
846
847 /**
848  * Implementation of dt_index_operations::dio_it.rec
849  *
850  * Get the record at current iterator position. These iteration methods
851  * (dio_it) will only be used for iterating the remote directory, so it
852  * uses lu_dirent_calc_size() to calculate the record size.
853  *
854  * \param[in] env       execution environment
855  * \param[in] di        iterator of this iteration
856  * \param[out] rec      the record to be returned
857  * \param[in] attr      attributes of the index object, so it knows
858  *                      how to pack the entry.
859  *
860  * \retval              only return 0 for now
861  */
862 static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di,
863                                struct dt_rec *rec, __u32 attr)
864 {
865         struct osp_it           *it = (struct osp_it *)di;
866         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
867         int                     reclen;
868
869         reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr);
870         memcpy(rec, ent, reclen);
871         return 0;
872 }
873
874 /**
875  * Implementation of dt_index_operations::dio_it.load
876  *
877  * Locate the iteration cursor to the specified position (cookie).
878  *
879  * \param[in] env       pointer to the thread context
880  * \param[in] di        pointer to the iteration structure
881  * \param[in] hash      the specified position
882  *
883  * \retval              positive number for locating to the exactly position
884  *                      or the next
885  * \retval              0 for arriving at the end of the iteration
886  * \retval              negative error number on failure
887  */
888 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
889                        __u64 hash)
890 {
891         struct osp_it   *it     = (struct osp_it *)di;
892         int              rc;
893
894         it->ooi_next = hash;
895         rc = osp_md_index_it_next(env, (struct dt_it *)di);
896         if (rc == 1)
897                 return 0;
898
899         if (rc == 0)
900                 return 1;
901
902         return rc;
903 }
904
905 const struct dt_index_operations osp_md_index_ops = {
906         .dio_lookup         = osp_md_index_lookup,
907         .dio_declare_insert = osp_md_declare_insert,
908         .dio_insert         = osp_md_index_insert,
909         .dio_declare_delete = osp_md_declare_delete,
910         .dio_delete         = osp_md_index_delete,
911         .dio_it     = {
912                 .init     = osp_it_init,
913                 .fini     = osp_it_fini,
914                 .get      = osp_it_get,
915                 .put      = osp_it_put,
916                 .next     = osp_md_index_it_next,
917                 .key      = osp_it_key,
918                 .key_size = osp_it_key_size,
919                 .rec      = osp_md_index_it_rec,
920                 .store    = osp_it_store,
921                 .load     = osp_it_load,
922                 .key_rec  = osp_it_key_rec,
923         }
924 };
925
926 /**
927  * Implementation of dt_object_operations::do_index_try
928  *
929  * Try to initialize the index API pointer for the given object. This
930  * is the entry point of the index API, i.e. we must call this method
931  * to initialize the index object before calling other index methods.
932  *
933  * \param[in] env       execution environment
934  * \param[in] dt        index object to be initialized
935  * \param[in] feat      the index feature of the object
936  *
937  * \retval              0 if the initialization succeeds.
938  * \retval              negative errno if the initialization fails.
939  */
940 static int osp_md_index_try(const struct lu_env *env,
941                             struct dt_object *dt,
942                             const struct dt_index_features *feat)
943 {
944         dt->do_index_ops = &osp_md_index_ops;
945         return 0;
946 }
947
948 /**
949  * Implementation of dt_object_operations::do_object_lock
950  *
951  * Enqueue a lock (by ldlm_cli_enqueue()) of remote object on the remote MDT,
952  * which will lock the object in the global namespace.
953  *
954  * \param[in] env       execution environment
955  * \param[in] dt        object to be locked
956  * \param[out] lh       lock handle
957  * \param[in] einfo     enqueue information
958  * \param[in] policy    lock policy
959  *
960  * \retval              ELDLM_OK if locking the object succeeds.
961  * \retval              negative errno if locking fails.
962  */
963 static int osp_md_object_lock(const struct lu_env *env,
964                               struct dt_object *dt,
965                               struct lustre_handle *lh,
966                               struct ldlm_enqueue_info *einfo,
967                               ldlm_policy_data_t *policy)
968 {
969         struct ldlm_res_id      *res_id;
970         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
971         struct osp_device       *osp = dt2osp_dev(dt_dev);
972         struct ptlrpc_request   *req;
973         int                     rc = 0;
974         __u64                   flags = 0;
975         ldlm_mode_t             mode;
976
977         res_id = einfo->ei_res_id;
978         LASSERT(res_id != NULL);
979
980         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
981                                LDLM_FL_BLOCK_GRANTED, res_id,
982                                einfo->ei_type, policy,
983                                einfo->ei_mode, lh, 0);
984         if (mode > 0)
985                 return ELDLM_OK;
986
987         req = ldlm_enqueue_pack(osp->opd_exp, 0);
988         if (IS_ERR(req))
989                 RETURN(PTR_ERR(req));
990
991         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
992                               (const ldlm_policy_data_t *)policy,
993                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
994
995         ptlrpc_req_finished(req);
996
997         return rc == ELDLM_OK ? 0 : -EIO;
998 }
999
1000 /**
1001  * Implementation of dt_object_operations::do_object_unlock
1002  *
1003  * Cancel a lock of a remote object.
1004  *
1005  * \param[in] env       execution environment
1006  * \param[in] dt        object to be unlocked
1007  * \param[in] einfo     lock enqueue information
1008  * \param[in] policy    lock policy
1009  *
1010  * \retval              Only return 0 for now.
1011  */
1012 static int osp_md_object_unlock(const struct lu_env *env,
1013                                 struct dt_object *dt,
1014                                 struct ldlm_enqueue_info *einfo,
1015                                 ldlm_policy_data_t *policy)
1016 {
1017         struct lustre_handle    *lockh = einfo->ei_cbdata;
1018
1019         /* unlock finally */
1020         ldlm_lock_decref(lockh, einfo->ei_mode);
1021
1022         return 0;
1023 }
1024
1025 struct dt_object_operations osp_md_obj_ops = {
1026         .do_read_lock         = osp_md_object_read_lock,
1027         .do_write_lock        = osp_md_object_write_lock,
1028         .do_read_unlock       = osp_md_object_read_unlock,
1029         .do_write_unlock      = osp_md_object_write_unlock,
1030         .do_write_locked      = osp_md_object_write_locked,
1031         .do_declare_create    = osp_md_declare_object_create,
1032         .do_create            = osp_md_object_create,
1033         .do_declare_ref_add   = osp_md_declare_ref_add,
1034         .do_ref_add           = osp_md_object_ref_add,
1035         .do_declare_ref_del   = osp_md_declare_object_ref_del,
1036         .do_ref_del           = osp_md_object_ref_del,
1037         .do_declare_destroy   = osp_declare_object_destroy,
1038         .do_destroy           = osp_object_destroy,
1039         .do_ah_init           = osp_md_ah_init,
1040         .do_attr_get          = osp_attr_get,
1041         .do_declare_attr_set  = osp_md_declare_attr_set,
1042         .do_attr_set          = osp_md_attr_set,
1043         .do_xattr_get         = osp_xattr_get,
1044         .do_declare_xattr_set = osp_declare_xattr_set,
1045         .do_xattr_set         = osp_xattr_set,
1046         .do_declare_xattr_del = osp_declare_xattr_del,
1047         .do_xattr_del         = osp_xattr_del,
1048         .do_index_try         = osp_md_index_try,
1049         .do_object_lock       = osp_md_object_lock,
1050         .do_object_unlock     = osp_md_object_unlock,
1051 };
1052
1053 /**
1054  * Implementation of dt_body_operations::dbo_declare_write
1055  *
1056  * Declare an object write. In DNE phase I, it will pack the write
1057  * object update into the RPC.
1058  *
1059  * \param[in] env       execution environment
1060  * \param[in] dt        object to be written
1061  * \param[in] buf       buffer to write which includes an embedded size field
1062  * \param[in] pos       offet in the object to start writing at
1063  * \param[in] th        transaction handle
1064  *
1065  * \retval              0 if the insertion succeeds.
1066  * \retval              negative errno if the insertion fails.
1067  */
1068 static ssize_t osp_md_declare_write(const struct lu_env *env,
1069                                     struct dt_object *dt,
1070                                     const struct lu_buf *buf,
1071                                     loff_t pos, struct thandle *th)
1072 {
1073         struct dt_update_request  *update;
1074         struct lu_fid             *fid;
1075         int                       sizes[2] = {buf->lb_len, sizeof(pos)};
1076         const char                *bufs[2] = {(char *)buf->lb_buf,
1077                                               (char *)&pos};
1078         ssize_t                   rc;
1079
1080         update = out_find_create_update_loc(th, dt);
1081         if (IS_ERR(update)) {
1082                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
1083                        dt->do_lu.lo_dev->ld_obd->obd_name,
1084                        (int)PTR_ERR(update));
1085                 return PTR_ERR(update);
1086         }
1087
1088         pos = cpu_to_le64(pos);
1089         bufs[1] = (char *)&pos;
1090         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
1091         rc = out_insert_update(env, update, OUT_WRITE, fid,
1092                                ARRAY_SIZE(sizes), sizes, bufs);
1093
1094         return rc;
1095
1096 }
1097
1098 /**
1099  * Implementation of dt_body_operations::dbo_write
1100  *
1101  * Return the buffer size. In DNE phase I, remote updates
1102  * are actually executed during transaction start, the buffer has
1103  * already been written when this method is being called.
1104  *
1105  * \param[in] env       execution environment
1106  * \param[in] dt        object to be written
1107  * \param[in] buf       buffer to write which includes an embedded size field
1108  * \param[in] pos       offet in the object to start writing at
1109  * \param[in] th        transaction handle
1110  * \param[in] capa      capability of the write (not yet implemented)
1111  * \param[in] ignore_quota quota enforcement for this write
1112  *
1113  * \retval              the buffer size in bytes.
1114  */
1115 static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
1116                             const struct lu_buf *buf, loff_t *pos,
1117                             struct thandle *handle,
1118                             struct lustre_capa *capa, int ignore_quota)
1119 {
1120         return buf->lb_len;
1121 }
1122
1123 /* These body operation will be used to write symlinks during migration etc */
1124 struct dt_body_operations osp_md_body_ops = {
1125         .dbo_declare_write      = osp_md_declare_write,
1126         .dbo_write              = osp_md_write,
1127 };