Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ public static Table tableToProto(
ColumnOrder.newBuilder().setOrderType(orderType).setInputRef(inputRefExpr).build();
builder.addColumnOrders(columnOrder);
}
builder.setIsAssociated(createMaterializedViewInfo.isAssociated());
}
builder.setIsAssociated(createMaterializedViewInfo.isAssociated());
}

return builder.build();
Expand Down
81 changes: 80 additions & 1 deletion rust/meta/src/dashboard/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,25 @@ <h1 class="flex-auto leading-6 font-semibold text-sky-500 dark:text-sky-400">
</div>
<div id="actors" class="w-full">
</div>
<div class="flex mt-5 mb-2 text-sm">
<h1 class="flex-auto leading-6 font-semibold text-sky-500 dark:text-sky-400">
Materialized Views
</h1>
<div class="form-check">
<input class="form-check-input" type="checkbox" value="" id="showAssociateMvCheckbox"
onchange="handleShowAssociateMvClick(this);">
<label class="form-check-label text-gray-500" for="showAssociateMvCheckbox">
Show Associated MV
</label>
</div>
</div>
<div class="form-check">
<select class="flex mb-2 leading-4 font-semibold text-sky-500 dark:text-sky-200" id="mvSelect"
onchange="handleMvSelectClick(this);">
</select>
</div>
<div id="mvFragments" class="w-full">
</div>
</div>
</div>
</body>
Expand All @@ -86,6 +105,9 @@ <h1 class="flex-auto leading-6 font-semibold text-sky-500 dark:text-sky-400">
<script>
/// Whether we should resolve mv on mv
let resolveMvOnMv = false
let currentMv = ""
let currentMvs = new Set
let showAssociateMv = false

const cluster = (type, cluster) => `
<div class="p-6 max-w bg-white rounded-xl shadow-md flex flex-col space-y-1">
Expand All @@ -108,6 +130,18 @@ <h1 class="flex-auto leading-6 font-semibold text-sky-500 dark:text-sky-400">
<div id="message-${nodeId}" class="w-full flex flex-row"></div>
</div>`

const mvActors = (actors, mvId) => `
<div class="p-6 max-w bg-white rounded-xl shadow-md flex flex-col space-y-1">
<div class="flex flex-row items-center">
<div class="w-3 h-3 flex-none bg-green-600 rounded-full mr-2"></div>
<div class="text-xl font-medium text-black">#ID: ${mvId}</div>
</div>
<div id="actor-wrapper-${mvId}" class="border border-gray-200 overflow-x-scroll">
<svg id="actor-${mvId}" class="h-full"></svg>
</div>
<div id="message-${mvId}" class="w-full flex flex-row"></div>
</div>`

/// Remove `input` from node object
const exprNode = (actorNode) => (({ input, ...o }) => o)(actorNode)

Expand Down Expand Up @@ -364,7 +398,6 @@ <h1 class="flex-auto leading-6 font-semibold text-sky-500 dark:text-sky-400">
`<a target="_blank" rel="noopener noreferrer" class="text-sky-600" href="http://localhost:16680/search?service=compute&tags=%7B%22actor_id%22%3A%22${actorId}%22%2C%22msg%22%3A%22chunk%22%7D">Trace Message of Actor #${actorId}</a><br>
<a target="_blank" rel="noopener noreferrer" class="text-sky-600" href="http://localhost:16680/search?service=compute&tags=%7B%22actor_id%22%3A%22${actorId}%22%2C%22epoch%22%3A%22-1%22%7D">Trace Epoch "-1" of Actor #${actorId}</a><br>`
return `<div class="flex-1 overflow-x-scroll border border-gray-200 p-1">
<p class="text-xs font-mono">@${selectedActor.node.host.host}:${selectedActor.node.host.port}</p>
<p>${actorJump()}</p>
<p class="whitespace-pre text-xs font-mono">${JSON.stringify(node, null, 2)}</p>
</div>`
Expand Down Expand Up @@ -507,7 +540,53 @@ <h1 class="flex-auto leading-6 font-semibold text-sky-500 dark:text-sky-400">
loadActors()
}

const mvSelectOption = (id, name) =>
`<option value="${id}">${name}</option>`

const loadMvOptions = () => {
$("#mvSelect").empty()
$("#mvSelect").append(mvSelectOption("", "All"))
currentMvs.clear()
fetch('/api/materialized_views')
.then(response => response.json())
.then(data => data.forEach(
(data) => {
if (showAssociateMv || !data[1].isAssociated) {
currentMvs.add(data[0])
$("#mvSelect").append(mvSelectOption(data[0], data[1].tableName))
}
}))
}

const loadFragments = () => {
$("#mvFragments").empty()
fetch('/api/fragments')
.then(response => response.json())
.then(data => data.forEach(
(data) => {
if ((currentMv === "" && currentMvs.has(data[0])) || data[0].toString() === currentMv) {
let mvActorData = {actors: data[1]};
$("#mvFragments").append(mvActors(mvActorData, data[0]))
layoutStreamGraphs(mvActorData, data[0])
}
}));
}

const handleMvSelectClick = (select) => {
currentMv = select.value
loadFragments()
}

const handleShowAssociateMvClick = (cb) => {
showAssociateMv = cb.checked
currentMv = ""
loadMvOptions()
loadFragments()
}

loadActors()
loadMvOptions()
loadFragments()

</script>

Expand Down
41 changes: 40 additions & 1 deletion rust/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ use tower_http::add_extension::AddExtensionLayer;
use tower_http::cors::{self, CorsLayer};

use crate::cluster::StoredClusterManager;
use crate::storage::MetaStoreRef;
use crate::stream::FragmentManager;

#[derive(Clone)]
pub struct DashboardService {
pub dashboard_addr: SocketAddr,
pub cluster_manager: Arc<StoredClusterManager>,
pub fragment_manager: Arc<FragmentManager>,

// TODO: replace with catalog manager.
pub meta_store_ref: MetaStoreRef,
pub has_test_data: Arc<AtomicBool>,
}

Expand All @@ -29,13 +33,16 @@ pub type Service = Arc<DashboardService>;
mod handlers {
use axum::Json;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::meta::ActorLocation;
use risingwave_pb::meta::{ActorLocation, Table};
use risingwave_pb::stream_plan::StreamActor;
use serde_json::json;

use super::*;

pub struct DashboardError(anyhow::Error);
pub type Result<T> = std::result::Result<T, DashboardError>;
type TableId = i32;
type TableActors = (TableId, Vec<StreamActor>);

fn err(err: impl Into<anyhow::Error>) -> DashboardError {
DashboardError(err.into())
Expand Down Expand Up @@ -68,6 +75,21 @@ mod handlers {
Ok(result.into())
}

pub async fn list_materialized_views(
Extension(srv): Extension<Service>,
) -> Result<Json<Vec<(TableId, Table)>>> {
use crate::model::MetadataModel;

let materialized_views = Table::list(&srv.meta_store_ref)
.await
.map_err(err)?
.iter()
.filter(|t| t.is_materialized_view)
.map(|mv| (mv.table_ref_id.as_ref().unwrap().table_id, mv.clone()))
.collect::<Vec<_>>();
Ok(Json(materialized_views))
}

pub async fn list_actors(
Extension(srv): Extension<Service>,
) -> Result<Json<Vec<ActorLocation>>> {
Expand All @@ -87,6 +109,21 @@ mod handlers {

Ok(Json(actors))
}

pub async fn list_table_fragments(
Extension(srv): Extension<Service>,
) -> Result<Json<Vec<TableActors>>> {
let table_fragments = srv
.fragment_manager
.list_table_fragments()
.await
.map_err(err)?
.iter()
.map(|f| (f.table_id().table_id(), f.actors()))
.collect::<Vec<_>>();

Ok(Json(table_fragments))
}
}

impl DashboardService {
Expand All @@ -96,6 +133,8 @@ impl DashboardService {
let app = Router::new()
.route("/api/clusters/:ty", get(list_clusters))
.route("/api/actors", get(list_actors))
.route("/api/fragments", get(list_table_fragments))
.route("/api/materialized_views", get(list_materialized_views))
.route(
"/",
get(|| async { Html::from(include_str!("index.html")) }),
Expand Down
1 change: 1 addition & 0 deletions rust/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub async fn rpc_serve(
dashboard_addr,
cluster_manager: cluster_manager.clone(),
fragment_manager: fragment_manager.clone(),
meta_store_ref: env.meta_store_ref(),
has_test_data: Arc::new(std::sync::atomic::AtomicBool::new(false)),
};
tokio::spawn(dashboard_service.serve()); // TODO: join dashboard service back to local
Expand Down
8 changes: 8 additions & 0 deletions rust/meta/src/stream/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ impl FragmentManager {
}
}

pub async fn list_table_fragments(&self) -> Result<Vec<TableFragments>> {
Ok(self
.table_fragments
.iter()
.map(|f| f.value().clone())
.collect())
}

pub async fn update_table_fragments(&self, table_fragment: TableFragments) -> Result<()> {
match self.table_fragments.entry(table_fragment.table_id()) {
Entry::Occupied(mut entry) => {
Expand Down