4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2013, Intel Corporation.
26 * lustre/osp/osp_trans.c
28 * Author: Di Wang <di.wang@intel.com>
29 * Author: Fan, Yong <fan.yong@intel.com>
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include "osp_internal.h"
36 struct osp_async_update_args {
37 struct update_request *oaua_update;
40 struct osp_async_update_item {
41 struct list_head oaui_list;
42 struct osp_object *oaui_obj;
44 osp_async_update_interpterer_t oaui_interpterer;
47 static struct osp_async_update_item *
48 osp_async_update_item_init(struct osp_object *obj, void *data,
49 osp_async_update_interpterer_t interpterer)
51 struct osp_async_update_item *oaui;
57 lu_object_get(osp2lu_obj(obj));
58 INIT_LIST_HEAD(&oaui->oaui_list);
60 oaui->oaui_data = data;
61 oaui->oaui_interpterer = interpterer;
66 static void osp_async_update_item_fini(const struct lu_env *env,
67 struct osp_async_update_item *oaui)
69 LASSERT(list_empty(&oaui->oaui_list));
71 lu_object_put(env, osp2lu_obj(oaui->oaui_obj));
75 static int osp_async_update_interpret(const struct lu_env *env,
76 struct ptlrpc_request *req,
79 struct update_reply *reply = NULL;
80 struct osp_async_update_args *oaua = arg;
81 struct update_request *update = oaua->oaua_update;
82 struct osp_async_update_item *oaui;
83 struct osp_async_update_item *next;
88 if (rc == 0 || req->rq_repmsg != NULL) {
89 reply = req_capsule_server_sized_get(&req->rq_pill,
92 if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
95 count = reply->ur_count;
100 list_for_each_entry_safe(oaui, next, &update->ur_cb_items, oaui_list) {
101 list_del_init(&oaui->oaui_list);
102 if (index < count && reply->ur_lens[index] > 0) {
103 char *ptr = update_get_buf_internal(reply, index, NULL);
105 LASSERT(ptr != NULL);
107 rc1 = le32_to_cpu(*(int *)ptr);
110 if (unlikely(rc1 == 0))
114 oaui->oaui_interpterer(env, reply, oaui->oaui_obj,
115 oaui->oaui_data, index, rc1);
116 osp_async_update_item_fini(env, oaui);
120 out_destroy_update_req(update);
125 int osp_unplug_async_update(const struct lu_env *env,
126 struct osp_device *osp,
127 struct update_request *update)
129 struct osp_async_update_args *args;
130 struct ptlrpc_request *req = NULL;
133 rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
134 update->ur_buf, UPDATE_BUFFER_SIZE, &req);
136 struct osp_async_update_item *oaui;
137 struct osp_async_update_item *next;
139 list_for_each_entry_safe(oaui, next,
140 &update->ur_cb_items, oaui_list) {
141 list_del_init(&oaui->oaui_list);
142 oaui->oaui_interpterer(env, NULL, oaui->oaui_obj,
143 oaui->oaui_data, 0, rc);
144 osp_async_update_item_fini(env, oaui);
146 out_destroy_update_req(update);
148 LASSERT(list_empty(&update->ur_list));
150 args = ptlrpc_req_async_args(req);
151 args->oaua_update = update;
152 req->rq_interpret_reply = osp_async_update_interpret;
153 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
159 /* with osp::opd_async_requests_mutex held */
160 struct update_request *
161 osp_find_or_create_async_update_request(struct osp_device *osp)
163 struct update_request *update = osp->opd_async_requests;
168 update = out_create_update_req(&osp->opd_dt_dev);
170 osp->opd_async_requests = update;
175 /* with osp::opd_async_requests_mutex held */
176 int osp_insert_async_update(const struct lu_env *env,
177 struct update_request *update, int op,
178 struct osp_object *obj, int count,
179 int *lens, const char **bufs, void *data,
180 osp_async_update_interpterer_t interpterer)
182 struct osp_async_update_item *oaui;
183 struct osp_device *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
187 oaui = osp_async_update_item_init(obj, data, interpterer);
192 rc = out_insert_update(env, update, op, lu_object_fid(osp2lu_obj(obj)),
195 osp->opd_async_requests = NULL;
196 mutex_unlock(&osp->opd_async_requests_mutex);
198 rc = osp_unplug_async_update(env, osp, update);
199 mutex_lock(&osp->opd_async_requests_mutex);
203 update = osp_find_or_create_async_update_request(osp);
205 GOTO(out, rc = PTR_ERR(update));
211 list_add_tail(&oaui->oaui_list, &update->ur_cb_items);
217 osp_async_update_item_fini(env, oaui);
223 * If the transaction creation goes to OSP, it means the update
224 * in this transaction only includes remote UPDATE. It is only
225 * used by LFSCK right now.
227 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
229 struct thandle *th = NULL;
230 struct thandle_update *tu = NULL;
234 if (unlikely(th == NULL))
235 GOTO(out, rc = -ENOMEM);
238 th->th_tags = LCT_TX_HANDLE;
239 atomic_set(&th->th_refc, 1);
240 th->th_alloc_size = sizeof(*th);
244 GOTO(out, rc = -ENOMEM);
246 INIT_LIST_HEAD(&tu->tu_remote_update_list);
247 tu->tu_only_remote_trans = 1;
262 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
263 struct update_request *update, struct thandle *th)
265 struct thandle_update *tu = th->th_update;
270 /* If the transaction only includes remote update, it should
271 * still be asynchronous */
272 if (is_only_remote_trans(th)) {
273 struct osp_async_update_args *args;
274 struct ptlrpc_request *req;
276 list_del_init(&update->ur_list);
277 rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
279 UPDATE_BUFFER_SIZE, &req);
281 args = ptlrpc_req_async_args(req);
282 args->oaua_update = update;
283 req->rq_interpret_reply =
284 osp_async_update_interpret;
285 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
287 out_destroy_update_req(update);
290 /* Before we support async update, the cross MDT transaction
291 * has to been synchronized */
293 rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
300 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
303 struct thandle_update *tu = th->th_update;
304 struct update_request *update;
310 /* Check whether there are updates related with this OSP */
311 update = out_find_update(tu, dt);
315 /* Note: some updates needs to send before local transaction,
316 * some needs to send after local transaction.
318 * If the transaction only includes remote updates, it will
319 * send updates to remote MDT in osp_trans_stop.
321 * If it is remote create, it will send the remote req after
322 * local transaction. i.e. create the object locally first,
323 * then insert the name entry.
325 * If it is remote unlink, it will send the remote req before
326 * the local transaction, i.e. delete the name entry remote
327 * first, then destroy the local object. */
328 if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans)
329 rc = osp_trans_trigger(env, dt2osp_dev(dt), update, th);
334 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
337 struct thandle_update *tu = th->th_update;
338 struct update_request *update;
342 /* Check whether there are updates related with this OSP */
343 update = out_find_update(tu, dt);
344 if (update == NULL) {
345 if (!is_only_remote_trans(th))
350 if (update->ur_buf->ub_count == 0) {
351 out_destroy_update_req(update);
355 if (is_only_remote_trans(th)) {
356 if (th->th_result == 0) {
357 rc = osp_trans_trigger(env, dt2osp_dev(dt),
361 out_destroy_update_req(update);
364 if (tu->tu_sent_after_local_trans)
365 rc = osp_trans_trigger(env, dt2osp_dev(dt),
368 out_destroy_update_req(update);