Whamcloud - gitweb
LU-3534 osp: move RPC pack from declare to execution phase
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * OST/MDT proxy device (OSP) Metadata methods
29  *
30  * This file implements methods for remote MD object, which include
31  * dt_object_operations, dt_index_operations and dt_body_operations.
32  *
33  * If there are multiple MDTs in one filesystem, one operation might
34  * include modifications in several MDTs. In such cases, clients
35  * send the RPC to the master MDT, then the operation is decomposed into
36  * object updates which will be dispatched to OSD or OSP. The local updates
37  * go to local OSD and the remote updates go to OSP. In OSP, these remote
38  * object updates will be packed into an update RPC, sent to the remote MDT
39  * and handled by Object Update Target (OUT).
40  *
41  * In DNE phase I, because of missing complete recovery solution, updates
42  * will be executed in order and synchronously.
43  *     1. The transaction is created.
44  *     2. In transaction declare, it collects and packs remote
45  *        updates (in osp_md_declare_xxx()).
46  *     3. In transaction start, it sends these remote updates
47  *        to remote MDTs, which will execute these updates synchronously.
48  *     4. In transaction execute phase, the local updates will be executed
49  *        synchronously.
50  *
51  * Author: Di Wang <di.wang@intel.com>
52  */
53
54 #define DEBUG_SUBSYSTEM S_MDS
55
56 #include <lustre_log.h>
57 #include "osp_internal.h"
58
59 static const char dot[] = ".";
60 static const char dotdot[] = "..";
61
62 /**
63  * Interpreter call for object creation
64  *
65  * Object creation interpreter, which will be called after creating
66  * the remote object to set flags and status.
67  *
68  * \param[in] env       execution environment
69  * \param[in] reply     update reply
70  * \param[in] req       ptlrpc update request for creating object
71  * \param[in] obj       object to be created
72  * \param[in] data      data used in this function.
73  * \param[in] index     index(position) of create update in the whole
74  *                      updates
75  * \param[in] rc        update result on the remote MDT.
76  *
77  * \retval              only return 0 for now
78  */
79 static int osp_object_create_interpreter(const struct lu_env *env,
80                                          struct object_update_reply *reply,
81                                          struct ptlrpc_request *req,
82                                          struct osp_object *obj,
83                                          void *data, int index, int rc)
84 {
85         if (rc != 0) {
86                 obj->opo_obj.do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
87                 obj->opo_non_exist = 1;
88         }
89         return 0;
90 }
91
92 /**
93  * Implementation of dt_object_operations::do_declare_create
94  *
95  * Create the dt_update_request to track the update for this OSP
96  * in the transaction.
97  *
98  * \param[in] env       execution environment
99  * \param[in] dt        remote object to be created
100  * \param[in] attr      attribute of the created object
101  * \param[in] hint      creation hint
102  * \param[in] dof       creation format information
103  * \param[in] th        the transaction handle
104  *
105  * \retval              0 if preparation succeeds.
106  * \retval              negative errno if preparation fails.
107  */
108 int osp_md_declare_object_create(const struct lu_env *env,
109                                  struct dt_object *dt,
110                                  struct lu_attr *attr,
111                                  struct dt_allocation_hint *hint,
112                                  struct dt_object_format *dof,
113                                  struct thandle *th)
114 {
115         return osp_trans_update_request_create(th);
116 }
117
118 /**
119  * Implementation of dt_object_operations::do_create
120  *
121  * It adds an OUT_CREATE sub-request into the OUT RPC that will be flushed
122  * when the transaction stop, and sets necessary flags for created object.
123  *
124  * \param[in] env       execution environment
125  * \param[in] dt        object to be created
126  * \param[in] attr      attribute of the created object
127  * \param[in] hint      creation hint
128  * \param[in] dof       creation format information
129  * \param[in] th        the transaction handle
130  *
131  * \retval              0 if packing creation succeeds.
132  * \retval              negative errno if packing creation fails.
133  */
134 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
135                          struct lu_attr *attr, struct dt_allocation_hint *hint,
136                          struct dt_object_format *dof, struct thandle *th)
137 {
138         struct dt_update_request        *update;
139         int                             rc;
140
141         update = thandle_to_dt_update_request(th);
142         LASSERT(update != NULL);
143
144         rc = out_create_pack(env, &update->dur_buf,
145                              lu_object_fid(&dt->do_lu), attr, hint, dof,
146                              update->dur_batchid);
147         if (rc != 0)
148                 GOTO(out, rc);
149
150         rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), attr,
151                                         osp_object_create_interpreter);
152
153         if (rc < 0)
154                 GOTO(out, rc);
155
156         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
157         dt2osp_obj(dt)->opo_non_exist = 0;
158 out:
159         return rc;
160 }
161
162 /**
163  * Implementation of dt_object_operations::do_declare_ref_del
164  *
165  * Create the dt_update_request to track the update for this OSP
166  * in the transaction.
167  *
168  * \param[in] env       execution environment
169  * \param[in] dt        object to decrease the reference count.
170  * \param[in] th        the transaction handle of refcount decrease.
171  *
172  * \retval              0 if preparation succeeds.
173  * \retval              negative errno if preparation fails.
174  */
175 static int osp_md_declare_ref_del(const struct lu_env *env,
176                                   struct dt_object *dt, struct thandle *th)
177 {
178         return osp_trans_update_request_create(th);
179 }
180
181 /**
182  * Implementation of dt_object_operations::do_ref_del
183  *
184  * Add an OUT_REF_DEL sub-request into the OUT RPC that will be
185  * flushed when the transaction stop.
186  *
187  * \param[in] env       execution environment
188  * \param[in] dt        object to decrease the reference count
189  * \param[in] th        the transaction handle
190  *
191  * \retval              0 if packing ref_del succeeds.
192  * \retval              negative errno if packing fails.
193  */
194 static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt,
195                           struct thandle *th)
196 {
197         struct dt_update_request        *update;
198         int                             rc;
199
200         update = thandle_to_dt_update_request(th);
201         LASSERT(update != NULL);
202
203         rc = out_ref_del_pack(env, &update->dur_buf,
204                               lu_object_fid(&dt->do_lu),
205                               update->dur_batchid);
206         return rc;
207 }
208
209 /**
210  * Implementation of dt_object_operations::do_declare_ref_del
211  *
212  * Create the dt_update_request to track the update for this OSP
213  * in the transaction.
214  *
215  * \param[in] env       execution environment
216  * \param[in] dt        object on which to increase the reference count.
217  * \param[in] th        the transaction handle.
218  *
219  * \retval              0 if preparation succeeds.
220  * \retval              negative errno if preparation fails.
221  */
222 static int osp_md_declare_ref_add(const struct lu_env *env,
223                                   struct dt_object *dt, struct thandle *th)
224 {
225         return osp_trans_update_request_create(th);
226 }
227
228 /**
229  * Implementation of dt_object_operations::do_ref_add
230  *
231  * Add an OUT_REF_ADD sub-request into the OUT RPC that will be flushed
232  * when the transaction stop.
233  *
234  * \param[in] env       execution environment
235  * \param[in] dt        object on which to increase the reference count
236  * \param[in] th        the transaction handle
237  *
238  * \retval              0 if packing ref_add succeeds.
239  * \retval              negative errno if packing fails.
240  */
241 static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt,
242                           struct thandle *th)
243 {
244         struct dt_update_request        *update;
245         int                             rc;
246
247         update = thandle_to_dt_update_request(th);
248         LASSERT(update != NULL);
249
250         rc = out_ref_add_pack(env, &update->dur_buf,
251                               lu_object_fid(&dt->do_lu),
252                               update->dur_batchid);
253         return rc;
254 }
255
256 /**
257  * Implementation of dt_object_operations::do_ah_init
258  *
259  * Initialize the allocation hint for object creation, which is usually called
260  * before the creation, and these hints (parent and child mode) will be sent to
261  * the remote Object Update Target (OUT) and used in the object create process,
262  * same as OSD object creation.
263  *
264  * \param[in] env       execution environment
265  * \param[in] ah        the hint to be initialized
266  * \param[in] parent    the parent of the object
267  * \param[in] child     the object to be created
268  * \param[in] child_mode the mode of the created object
269  */
270 static void osp_md_ah_init(const struct lu_env *env,
271                            struct dt_allocation_hint *ah,
272                            struct dt_object *parent,
273                            struct dt_object *child,
274                            umode_t child_mode)
275 {
276         LASSERT(ah);
277
278         ah->dah_parent = parent;
279         ah->dah_mode = child_mode;
280 }
281
282 /**
283  * Implementation of dt_object_operations::do_declare_attr_get
284  *
285  * Create the dt_update_request to track the update for this OSP
286  * in the transaction.
287  *
288  * \param[in] env       execution environment
289  * \param[in] dt        object on which to set attributes
290  * \param[in] attr      attributes to be set
291  * \param[in] th        the transaction handle
292  *
293  * \retval              0 if preparation succeeds.
294  * \retval              negative errno if preparation fails.
295  */
296 int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
297                             const struct lu_attr *attr, struct thandle *th)
298 {
299         return osp_trans_update_request_create(th);
300 }
301
302 /**
303  * Implementation of dt_object_operations::do_attr_set
304  *
305  * Set attributes to the specified remote object.
306  *
307  * Add the OUT_ATTR_SET sub-request into the OUT RPC that will be flushed
308  * when the transaction stop.
309  *
310  * \param[in] env       execution environment
311  * \param[in] dt        object to set attributes
312  * \param[in] attr      attributes to be set
313  * \param[in] th        the transaction handle
314  *
315  * \retval              0 if packing attr_set succeeds.
316  * \retval              negative errno if packing fails.
317  */
318 int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
319                     const struct lu_attr *attr, struct thandle *th)
320 {
321         struct dt_update_request        *update;
322         int                             rc;
323
324         update = thandle_to_dt_update_request(th);
325         LASSERT(update != NULL);
326
327         rc = out_attr_set_pack(env, &update->dur_buf,
328                                lu_object_fid(&dt->do_lu), attr,
329                                update->dur_batchid);
330
331         return rc;
332 }
333
334 /**
335  * Implementation of dt_object_operations::do_read_lock
336  *
337  * osp_md_object_{read,write}_lock() will only lock the remote object in the
338  * local cache, which uses the semaphore (opo_sem) inside the osp_object to
339  * lock the object. Note: it will not lock the object in the whole cluster,
340  * which relies on the LDLM lock.
341  *
342  * \param[in] env       execution environment
343  * \param[in] dt        object to be locked
344  * \param[in] role      lock role from MDD layer, see mdd_object_role().
345  */
346 static void osp_md_object_read_lock(const struct lu_env *env,
347                                     struct dt_object *dt, unsigned role)
348 {
349         struct osp_object  *obj = dt2osp_obj(dt);
350
351         LASSERT(obj->opo_owner != env);
352         down_read_nested(&obj->opo_sem, role);
353
354         LASSERT(obj->opo_owner == NULL);
355 }
356
357 /**
358  * Implementation of dt_object_operations::do_write_lock
359  *
360  * Lock the remote object in write mode.
361  *
362  * \param[in] env       execution environment
363  * \param[in] dt        object to be locked
364  * \param[in] role      lock role from MDD layer, see mdd_object_role().
365  */
366 static void osp_md_object_write_lock(const struct lu_env *env,
367                                      struct dt_object *dt, unsigned role)
368 {
369         struct osp_object *obj = dt2osp_obj(dt);
370
371         down_write_nested(&obj->opo_sem, role);
372
373         LASSERT(obj->opo_owner == NULL);
374         obj->opo_owner = env;
375 }
376
377 /**
378  * Implementation of dt_object_operations::do_read_unlock
379  *
380  * Unlock the read lock of remote object.
381  *
382  * \param[in] env       execution environment
383  * \param[in] dt        object to be unlocked
384  */
385 static void osp_md_object_read_unlock(const struct lu_env *env,
386                                       struct dt_object *dt)
387 {
388         struct osp_object *obj = dt2osp_obj(dt);
389
390         up_read(&obj->opo_sem);
391 }
392
393 /**
394  * Implementation of dt_object_operations::do_write_unlock
395  *
396  * Unlock the write lock of remote object.
397  *
398  * \param[in] env       execution environment
399  * \param[in] dt        object to be unlocked
400  */
401 static void osp_md_object_write_unlock(const struct lu_env *env,
402                                        struct dt_object *dt)
403 {
404         struct osp_object *obj = dt2osp_obj(dt);
405
406         LASSERT(obj->opo_owner == env);
407         obj->opo_owner = NULL;
408         up_write(&obj->opo_sem);
409 }
410
411 /**
412  * Implementation of dt_object_operations::do_write_locked
413  *
414  * Test if the object is locked in write mode.
415  *
416  * \param[in] env       execution environment
417  * \param[in] dt        object to be tested
418  */
419 static int osp_md_object_write_locked(const struct lu_env *env,
420                                       struct dt_object *dt)
421 {
422         struct osp_object *obj = dt2osp_obj(dt);
423
424         return obj->opo_owner == env;
425 }
426
427 /**
428  * Implementation of dt_index_operations::dio_lookup
429  *
430  * Look up record by key under a remote index object. It packs lookup update
431  * into RPC, sends to the remote OUT and waits for the lookup result.
432  *
433  * \param[in] env       execution environment
434  * \param[in] dt        index object to lookup
435  * \param[out] rec      record in which to return lookup result
436  * \param[in] key       key of index which will be looked up
437  *
438  * \retval              1 if the lookup succeeds.
439  * \retval              negative errno if the lookup fails.
440  */
441 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
442                                struct dt_rec *rec, const struct dt_key *key)
443 {
444         struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
445         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
446         struct dt_device        *dt_dev = &osp->opd_dt_dev;
447         struct dt_update_request   *update;
448         struct object_update_reply *reply;
449         struct ptlrpc_request      *req = NULL;
450         struct lu_fid              *fid;
451         int                        rc;
452         ENTRY;
453
454         /* Because it needs send the update buffer right away,
455          * just create an update buffer, instead of attaching the
456          * update_remote list of the thandle.
457          */
458         update = dt_update_request_create(dt_dev);
459         if (IS_ERR(update))
460                 RETURN(PTR_ERR(update));
461
462         rc = out_index_lookup_pack(env, &update->dur_buf,
463                                    lu_object_fid(&dt->do_lu), rec, key);
464         if (rc != 0) {
465                 CERROR("%s: Insert update error: rc = %d\n",
466                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
467                 GOTO(out, rc);
468         }
469
470         rc = osp_remote_sync(env, osp, update, &req);
471         if (rc < 0)
472                 GOTO(out, rc);
473
474         reply = req_capsule_server_sized_get(&req->rq_pill,
475                                              &RMF_OUT_UPDATE_REPLY,
476                                              OUT_UPDATE_REPLY_SIZE);
477         if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
478                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
479                        dt_dev->dd_lu_dev.ld_obd->obd_name,
480                        reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
481                 GOTO(out, rc = -EPROTO);
482         }
483
484         rc = object_update_result_data_get(reply, lbuf, 0);
485         if (rc < 0)
486                 GOTO(out, rc);
487
488         if (lbuf->lb_len != sizeof(*fid)) {
489                 CERROR("%s: lookup "DFID" %s wrong size %d\n",
490                        dt_dev->dd_lu_dev.ld_obd->obd_name,
491                        PFID(lu_object_fid(&dt->do_lu)), (char *)key,
492                        (int)lbuf->lb_len);
493                 GOTO(out, rc = -EINVAL);
494         }
495
496         fid = lbuf->lb_buf;
497         if (ptlrpc_rep_need_swab(req))
498                 lustre_swab_lu_fid(fid);
499         if (!fid_is_sane(fid)) {
500                 CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
501                        dt_dev->dd_lu_dev.ld_obd->obd_name,
502                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
503                 GOTO(out, rc = -EINVAL);
504         }
505
506         memcpy(rec, fid, sizeof(*fid));
507
508         GOTO(out, rc = 1);
509
510 out:
511         if (req != NULL)
512                 ptlrpc_req_finished(req);
513
514         dt_update_request_destroy(update);
515
516         return rc;
517 }
518
519 /**
520  * Implementation of dt_index_operations::dio_declare_insert
521  *
522  * Create the dt_update_request to track the update for this OSP
523  * in the transaction.
524  *
525  * \param[in] env       execution environment
526  * \param[in] dt        object for which to insert index
527  * \param[in] rec       record of the index which will be inserted
528  * \param[in] key       key of the index which will be inserted
529  * \param[in] th        the transaction handle
530  *
531  * \retval              0 if preparation succeeds.
532  * \retval              negative errno if preparation fails.
533  */
534 static int osp_md_declare_index_insert(const struct lu_env *env,
535                                        struct dt_object *dt,
536                                        const struct dt_rec *rec,
537                                        const struct dt_key *key,
538                                        struct thandle *th)
539 {
540         return osp_trans_update_request_create(th);
541 }
542
543 /**
544  * Implementation of dt_index_operations::dio_insert
545  *
546  * Add an OUT_INDEX_INSERT sub-request into the OUT RPC that will
547  * be flushed when the transaction stop.
548  *
549  * \param[in] env       execution environment
550  * \param[in] dt        object for which to insert index
551  * \param[in] rec       record of the index to be inserted
552  * \param[in] key       key of the index to be inserted
553  * \param[in] th        the transaction handle
554  * \param[in] ignore_quota quota enforcement for insert
555  *
556  * \retval              0 if packing index insert succeeds.
557  * \retval              negative errno if packing fails.
558  */
559 static int osp_md_index_insert(const struct lu_env *env,
560                                struct dt_object *dt,
561                                const struct dt_rec *rec,
562                                const struct dt_key *key,
563                                struct thandle *th,
564                                int ignore_quota)
565 {
566         struct osp_thandle       *oth = thandle_to_osp_thandle(th);
567         struct dt_update_request *update = oth->ot_dur;
568         int                      rc;
569
570
571         rc = out_index_insert_pack(env, &update->dur_buf,
572                                    lu_object_fid(&dt->do_lu), rec, key,
573                                    update->dur_batchid);
574
575         return rc;
576 }
577
578 /**
579  * Implementation of dt_index_operations::dio_declare_delete
580  *
581  * Create the dt_update_request to track the update for this OSP
582  * in the transaction.
583  *
584  * \param[in] env       execution environment
585  * \param[in] dt        object for which to delete index
586  * \param[in] key       key of the index
587  * \param[in] th        the transaction handle
588  *
589  * \retval              0 if preparation succeeds.
590  * \retval              negative errno if preparation fails.
591  */
592 static int osp_md_declare_index_delete(const struct lu_env *env,
593                                        struct dt_object *dt,
594                                        const struct dt_key *key,
595                                        struct thandle *th)
596 {
597         return osp_trans_update_request_create(th);
598 }
599
600 /**
601  * Implementation of dt_index_operations::dio_delete
602  *
603  * Add an OUT_INDEX_DELETE sub-request into the OUT RPC that will
604  * be flushed when the transaction stop.
605  *
606  * \param[in] env       execution environment
607  * \param[in] dt        object for which to delete index
608  * \param[in] key       key of the index which will be deleted
609  * \param[in] th        the transaction handle
610  *
611  * \retval              0 if packing index delete succeeds.
612  * \retval              negative errno if packing fails.
613  */
614 static int osp_md_index_delete(const struct lu_env *env,
615                                struct dt_object *dt,
616                                const struct dt_key *key,
617                                struct thandle *th)
618 {
619         struct dt_update_request *update;
620         int                      rc;
621
622         update = thandle_to_dt_update_request(th);
623         LASSERT(update != NULL);
624
625         rc = out_index_delete_pack(env, &update->dur_buf,
626                                    lu_object_fid(&dt->do_lu), key,
627                                    update->dur_batchid);
628         return rc;
629 }
630
631 /**
632  * Implementation of dt_index_operations::dio_it.next
633  *
634  * Advance the pointer of the iterator to the next entry. It shares a similar
635  * internal implementation with osp_orphan_it_next(), which is being used for
636  * remote orphan index object. This method will be used for remote directory.
637  *
638  * \param[in] env       execution environment
639  * \param[in] di        iterator of this iteration
640  *
641  * \retval              0 if the pointer is advanced successfuly.
642  * \retval              1 if it reaches to the end of the index object.
643  * \retval              negative errno if the pointer cannot be advanced.
644  */
645 static int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di)
646 {
647         struct osp_it           *it = (struct osp_it *)di;
648         struct lu_idxpage       *idxpage;
649         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
650         int                     rc;
651         ENTRY;
652
653 again:
654         idxpage = it->ooi_cur_idxpage;
655         if (idxpage != NULL) {
656                 if (idxpage->lip_nr == 0)
657                         RETURN(1);
658
659                 it->ooi_pos_ent++;
660                 if (ent == NULL) {
661                         it->ooi_ent =
662                               (struct lu_dirent *)idxpage->lip_entries;
663                         RETURN(0);
664                 } else if (le16_to_cpu(ent->lde_reclen) != 0 &&
665                            it->ooi_pos_ent < idxpage->lip_nr) {
666                         ent = (struct lu_dirent *)(((char *)ent) +
667                                         le16_to_cpu(ent->lde_reclen));
668                         it->ooi_ent = ent;
669                         RETURN(0);
670                 } else {
671                         it->ooi_ent = NULL;
672                 }
673         }
674
675         rc = osp_it_next_page(env, di);
676         if (rc == 0)
677                 goto again;
678
679         RETURN(rc);
680 }
681
682 /**
683  * Implementation of dt_index_operations::dio_it.key
684  *
685  * Get the key at current iterator poisiton. These iteration methods
686  * (dio_it) will only be used for iterating the remote directory, so
687  * the key is the name of the directory entry.
688  *
689  * \param[in] env       execution environment
690  * \param[in] di        iterator of this iteration
691  *
692  * \retval              name of the current entry
693  */
694 static struct dt_key *osp_it_key(const struct lu_env *env,
695                                  const struct dt_it *di)
696 {
697         struct osp_it           *it = (struct osp_it *)di;
698         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
699
700         return (struct dt_key *)ent->lde_name;
701 }
702
703 /**
704  * Implementation of dt_index_operations::dio_it.key_size
705  *
706  * Get the key size at current iterator poisiton. These iteration methods
707  * (dio_it) will only be used for iterating the remote directory, so the key
708  * size is the name size of the directory entry.
709  *
710  * \param[in] env       execution environment
711  * \param[in] di        iterator of this iteration
712  *
713  * \retval              name size of the current entry
714  */
715
716 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
717 {
718         struct osp_it           *it = (struct osp_it *)di;
719         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
720
721         return (int)le16_to_cpu(ent->lde_namelen);
722 }
723
724 /**
725  * Implementation of dt_index_operations::dio_it.rec
726  *
727  * Get the record at current iterator position. These iteration methods
728  * (dio_it) will only be used for iterating the remote directory, so it
729  * uses lu_dirent_calc_size() to calculate the record size.
730  *
731  * \param[in] env       execution environment
732  * \param[in] di        iterator of this iteration
733  * \param[out] rec      the record to be returned
734  * \param[in] attr      attributes of the index object, so it knows
735  *                      how to pack the entry.
736  *
737  * \retval              only return 0 for now
738  */
739 static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di,
740                                struct dt_rec *rec, __u32 attr)
741 {
742         struct osp_it           *it = (struct osp_it *)di;
743         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
744         size_t                  reclen;
745
746         reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr);
747         memcpy(rec, ent, reclen);
748         return 0;
749 }
750
751 /**
752  * Implementation of dt_index_operations::dio_it.load
753  *
754  * Locate the iteration cursor to the specified position (cookie).
755  *
756  * \param[in] env       pointer to the thread context
757  * \param[in] di        pointer to the iteration structure
758  * \param[in] hash      the specified position
759  *
760  * \retval              positive number for locating to the exactly position
761  *                      or the next
762  * \retval              0 for arriving at the end of the iteration
763  * \retval              negative error number on failure
764  */
765 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
766                        __u64 hash)
767 {
768         struct osp_it   *it     = (struct osp_it *)di;
769         int              rc;
770
771         it->ooi_next = hash;
772         rc = osp_md_index_it_next(env, (struct dt_it *)di);
773         if (rc == 1)
774                 return 0;
775
776         if (rc == 0)
777                 return 1;
778
779         return rc;
780 }
781
782 const struct dt_index_operations osp_md_index_ops = {
783         .dio_lookup         = osp_md_index_lookup,
784         .dio_declare_insert = osp_md_declare_index_insert,
785         .dio_insert         = osp_md_index_insert,
786         .dio_declare_delete = osp_md_declare_index_delete,
787         .dio_delete         = osp_md_index_delete,
788         .dio_it     = {
789                 .init     = osp_it_init,
790                 .fini     = osp_it_fini,
791                 .get      = osp_it_get,
792                 .put      = osp_it_put,
793                 .next     = osp_md_index_it_next,
794                 .key      = osp_it_key,
795                 .key_size = osp_it_key_size,
796                 .rec      = osp_md_index_it_rec,
797                 .store    = osp_it_store,
798                 .load     = osp_it_load,
799                 .key_rec  = osp_it_key_rec,
800         }
801 };
802
803 /**
804  * Implementation of dt_object_operations::do_index_try
805  *
806  * Try to initialize the index API pointer for the given object. This
807  * is the entry point of the index API, i.e. we must call this method
808  * to initialize the index object before calling other index methods.
809  *
810  * \param[in] env       execution environment
811  * \param[in] dt        index object to be initialized
812  * \param[in] feat      the index feature of the object
813  *
814  * \retval              0 if the initialization succeeds.
815  * \retval              negative errno if the initialization fails.
816  */
817 static int osp_md_index_try(const struct lu_env *env,
818                             struct dt_object *dt,
819                             const struct dt_index_features *feat)
820 {
821         dt->do_index_ops = &osp_md_index_ops;
822         return 0;
823 }
824
825 /**
826  * Implementation of dt_object_operations::do_object_lock
827  *
828  * Enqueue a lock (by ldlm_cli_enqueue()) of remote object on the remote MDT,
829  * which will lock the object in the global namespace.
830  *
831  * \param[in] env       execution environment
832  * \param[in] dt        object to be locked
833  * \param[out] lh       lock handle
834  * \param[in] einfo     enqueue information
835  * \param[in] policy    lock policy
836  *
837  * \retval              ELDLM_OK if locking the object succeeds.
838  * \retval              negative errno if locking fails.
839  */
840 static int osp_md_object_lock(const struct lu_env *env,
841                               struct dt_object *dt,
842                               struct lustre_handle *lh,
843                               struct ldlm_enqueue_info *einfo,
844                               ldlm_policy_data_t *policy)
845 {
846         struct ldlm_res_id      *res_id;
847         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
848         struct osp_device       *osp = dt2osp_dev(dt_dev);
849         struct ptlrpc_request   *req;
850         int                     rc = 0;
851         __u64                   flags = 0;
852         ldlm_mode_t             mode;
853
854         res_id = einfo->ei_res_id;
855         LASSERT(res_id != NULL);
856
857         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
858                                LDLM_FL_BLOCK_GRANTED, res_id,
859                                einfo->ei_type, policy,
860                                einfo->ei_mode, lh, 0);
861         if (mode > 0)
862                 return ELDLM_OK;
863
864         req = ldlm_enqueue_pack(osp->opd_exp, 0);
865         if (IS_ERR(req))
866                 RETURN(PTR_ERR(req));
867
868         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
869                               (const ldlm_policy_data_t *)policy,
870                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
871
872         ptlrpc_req_finished(req);
873
874         return rc == ELDLM_OK ? 0 : -EIO;
875 }
876
877 /**
878  * Implementation of dt_object_operations::do_object_unlock
879  *
880  * Cancel a lock of a remote object.
881  *
882  * \param[in] env       execution environment
883  * \param[in] dt        object to be unlocked
884  * \param[in] einfo     lock enqueue information
885  * \param[in] policy    lock policy
886  *
887  * \retval              Only return 0 for now.
888  */
889 static int osp_md_object_unlock(const struct lu_env *env,
890                                 struct dt_object *dt,
891                                 struct ldlm_enqueue_info *einfo,
892                                 ldlm_policy_data_t *policy)
893 {
894         struct lustre_handle    *lockh = einfo->ei_cbdata;
895
896         /* unlock finally */
897         ldlm_lock_decref(lockh, einfo->ei_mode);
898
899         return 0;
900 }
901
902 /**
903  * Implement OSP layer dt_object_operations::do_declare_destroy() interface.
904  *
905  * Create the dt_update_request to track the update for this OSP
906  * in the transaction.
907  *
908  * \param[in] env       pointer to the thread context
909  * \param[in] dt        pointer to the OSP layer dt_object to be destroyed
910  * \param[in] th        pointer to the transaction handler
911  *
912  * \retval              0 for success
913  * \retval              negative error number on failure
914  */
915 int osp_md_declare_object_destroy(const struct lu_env *env,
916                                struct dt_object *dt, struct thandle *th)
917 {
918         return osp_trans_update_request_create(th);
919 }
920
921 /**
922  * Implement OSP layer dt_object_operations::do_destroy() interface.
923  *
924  * Pack the destroy update into the RPC buffer, which will be sent
925  * to the remote MDT during transaction stop.
926  *
927  * It also marks the object as non-cached.
928  *
929  * \param[in] env       pointer to the thread context
930  * \param[in] dt        pointer to the OSP layer dt_object to be destroyed
931  * \param[in] th        pointer to the transaction handler
932  *
933  * \retval              0 for success
934  * \retval              negative error number on failure
935  */
936 int osp_md_object_destroy(const struct lu_env *env, struct dt_object *dt,
937                           struct thandle *th)
938 {
939         struct osp_object               *o = dt2osp_obj(dt);
940         struct osp_device               *osp = lu2osp_dev(dt->do_lu.lo_dev);
941         struct dt_update_request        *update;
942         int                             rc = 0;
943
944         ENTRY;
945         o->opo_non_exist = 1;
946
947         LASSERT(osp->opd_connect_mdt);
948         update = thandle_to_dt_update_request(th);
949         LASSERT(update != NULL);
950
951         rc = out_object_destroy_pack(env, &update->dur_buf,
952                        lu_object_fid(&dt->do_lu), update->dur_batchid);
953         if (rc != 0)
954                 RETURN(rc);
955
956         /* not needed in cache any more */
957         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
958
959         RETURN(rc);
960 }
961
962 struct dt_object_operations osp_md_obj_ops = {
963         .do_read_lock         = osp_md_object_read_lock,
964         .do_write_lock        = osp_md_object_write_lock,
965         .do_read_unlock       = osp_md_object_read_unlock,
966         .do_write_unlock      = osp_md_object_write_unlock,
967         .do_write_locked      = osp_md_object_write_locked,
968         .do_declare_create    = osp_md_declare_object_create,
969         .do_create            = osp_md_object_create,
970         .do_declare_ref_add   = osp_md_declare_ref_add,
971         .do_ref_add           = osp_md_ref_add,
972         .do_declare_ref_del   = osp_md_declare_ref_del,
973         .do_ref_del           = osp_md_ref_del,
974         .do_declare_destroy   = osp_md_declare_object_destroy,
975         .do_destroy           = osp_md_object_destroy,
976         .do_ah_init           = osp_md_ah_init,
977         .do_attr_get          = osp_attr_get,
978         .do_declare_attr_set  = osp_md_declare_attr_set,
979         .do_attr_set          = osp_md_attr_set,
980         .do_xattr_get         = osp_xattr_get,
981         .do_declare_xattr_set = osp_declare_xattr_set,
982         .do_xattr_set         = osp_xattr_set,
983         .do_declare_xattr_del = osp_declare_xattr_del,
984         .do_xattr_del         = osp_xattr_del,
985         .do_index_try         = osp_md_index_try,
986         .do_object_lock       = osp_md_object_lock,
987         .do_object_unlock     = osp_md_object_unlock,
988 };
989
990 /**
991  * Implementation of dt_body_operations::dbo_declare_write
992  *
993  * Create the dt_update_request to track the update for this OSP
994  * in the transaction.
995   *
996  * \param[in] env       execution environment
997  * \param[in] dt        object to be written
998  * \param[in] buf       buffer to write which includes an embedded size field
999  * \param[in] pos       offet in the object to start writing at
1000  * \param[in] th        transaction handle
1001  *
1002  * \retval              0 if preparation succeeds.
1003  * \retval              negative errno if preparation fails.
1004  */
1005 static ssize_t osp_md_declare_write(const struct lu_env *env,
1006                                     struct dt_object *dt,
1007                                     const struct lu_buf *buf,
1008                                     loff_t pos, struct thandle *th)
1009 {
1010         return osp_trans_update_request_create(th);
1011 }
1012
1013 /**
1014  * Implementation of dt_body_operations::dbo_write
1015  *
1016  * Pack the write object update into the RPC buffer, which will be sent
1017  * to the remote MDT during transaction stop.
1018  *
1019  * \param[in] env       execution environment
1020  * \param[in] dt        object to be written
1021  * \param[in] buf       buffer to write which includes an embedded size field
1022  * \param[in] pos       offet in the object to start writing at
1023  * \param[in] th        transaction handle
1024  * \param[in] ignore_quota quota enforcement for this write
1025  *
1026  * \retval              the buffer size in bytes if packing succeeds.
1027  * \retval              negative errno if packing fails.
1028  */
1029 static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
1030                             const struct lu_buf *buf, loff_t *pos,
1031                             struct thandle *th, int ignore_quota)
1032 {
1033         struct dt_update_request  *update;
1034         ssize_t                   rc;
1035
1036         update = thandle_to_dt_update_request(th);
1037         LASSERT(update != NULL);
1038
1039         rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu),
1040                             buf, *pos, update->dur_batchid);
1041         if (rc < 0)
1042                 return rc;
1043
1044         /* XXX: how about the write error happened later? */
1045         *pos += buf->lb_len;
1046         return buf->lb_len;
1047 }
1048
1049 /* These body operation will be used to write symlinks during migration etc */
1050 struct dt_body_operations osp_md_body_ops = {
1051         .dbo_declare_write      = osp_md_declare_write,
1052         .dbo_write              = osp_md_write,
1053 };