Avro Data Store based on Akka (persistence in progressing)
This project is named from Chinese word "刹那", which is a transliteration of the word "Kasna" from Sanskrit, means "instant; split second".
 
- Each record is an actor (non-blocking)
- Akka sharding cluster (easy to scale-out)
- Locate field/value via XPath (since version 0.2.0)
- Scripting triggered by field updating events (JDK 8 JavaScript engine - Nashorn)
- JPQL query on cluster
$ sbt runOr
$ sbt clean compile dist
$ ls target/universal/
chana-0.2.0-SNAPSHOT.zipThen, copy chana-0.2.0-SNAPSHOT.zip to somewhere and unzip it
$ cd chana-0.2.0-SNAPSHOT/bin
$ ./chanaSchema: PersonInfo.avsc
{
  "type" : "record",
  "name" : "PersonInfo",
  "namespace" : "chana",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "age",
    "type" : "int"
  }, {
    "name" : "gender",
    "type" : {
      "type" : "enum",
      "name" : "GenderType",
      "symbols" : [ "Female", "Male", "Unknown" ]
    },
    "default" : "Unknown"
  }, {
    "name" : "emails",
    "type" : {
      "type" : "array",
      "items" : "string"
    }
  } ]
}Try it:
$ cd src/test/resources/avsc
$ curl --data @PersonInfo.avsc 'http://127.0.0.1:8080/schema/put/personinfo'
OK
$ curl 'http://127.0.0.1:8080/personinfo/get/1'
{"name":"","age":0,"gender":"Unknown","emails":[]}
$ curl --data-binary @PersonInfo.update 'http://127.0.0.1:8080/personinfo/update/1'
OK
$ curl 'http://127.0.0.1:8080/personinfo/get/1'
{"name":"James Bond","age":60,"gender":"Unknown","emails":[]}
$ curl 'http://127.0.0.1:8080/personinfo/get/1/name'
"James Bond"#### JPQL Simple test
$ cat jpql.put
echo 'SELECT COUNT(p.age), AVG(p.age) FROM PersonInfo p WHERE p.age >= 30 ORDER BY p.age' | \
curl -d @- 'http://127.0.0.1:8080/jpql/put/JPQL_NO_1'
$ ./jpql.put
#### watching jpql results
$ cat jpql.ask
while :
do
   sleep 1s
   curl 'http://127.0.0.1:8080/jpql/ask/JPQL_NO_1'
   printf '\n'
done
$ ./jpql.ask
#### update record with random id, repeat it to update more person's age to 80
$ cat jpql.update
RANDOM_ID=$RANDOM
JPQL="UPDATE PersonInfo p SET p.age = 80 WHERE p.id = '$RANDOM_ID'"
echo $JPQL
echo $JPQL | curl -d @- "http://127.0.0.1:8080/jpql"
$ ./jpql.updateA piece of JavaScript code that will be executed when field PersionInfo.name was updated: on_name.js:
function onNameUpdated() {
    var age = record.get("age");
    what_is(age);
    what_is(http_get);
    var http_get_result = http_get.apply("http://localhost:8080/ping", 5);
    java.lang.Thread.sleep(1000);
    what_is(http_get_result.value());
    what_is(http_post);
    var http_post_result = http_post.apply("http://localhost:8080/personinfo/put/2/age", "888", 5);
    java.lang.Thread.sleep(1000);
    what_is(http_post_result.value());
    for (i = 0; i < binlogs.length; i++) {
        var binlog = binglogs[i];
        what_is(binlog.type());
        what_is(binlog.xpath());
        what_is(binlog.value());
    }
}
function what_is(value) {
    print(id + ": " + value);
}
onNameUpdated();Try it:
$ curl --data-binary @on_name.js \
 'http://127.0.0.1:8080/personinfo/script/put/name/SCRIPT_NO_1'
OK
$ curl --data '"John"' 'http://127.0.0.1:8080/personinfo/put/1/name'
OK
$ curl 'http://127.0.0.1:8080/personinfo/get/2/age'
888Schema: hatInventory.avsc
{
  "type" : "record",
  "name" : "hatInventory",
  "namespace" : "chana",
  "fields" : [ {
    "name" : "sku",
    "type" : "string",
    "default" : ""
  }, {
    "name" : "description",
    "type" : {
      "type" : "record",
      "name" : "hatInfo",
      "fields" : [ {
        "name" : "style",
        "type" : "string",
        "default" : ""
      }, {
        "name" : "size",
        "type" : "string",
        "default" : ""
      }, {
        "name" : "color",
        "type" : "string",
        "default" : ""
      }, {
        "name" : "material",
        "type" : "string",
        "default" : ""
      } ]
    },
    "default" : { }
  } ]
}Try it:
$ cd src/test/resources/avsc
$ curl --data @hatInventory.avsc 'http://127.0.0.1:8080/schema/put/hatinv'
OK
$ curl 'http://127.0.0.1:8080/hatinv/get/1'
{"sku":"","description":{"style":"","size":"","color":"","material":""}}
$ curl --data '{"style":"classic","size":"Large","color":"Red"}' \
 'http://127.0.0.1:8080/hatinv/put/1/description'
OK
$ curl 'http://127.0.0.1:8080/hatinv/get/1'
{"sku":"","description":{"style":"classic","size":"Large","color":"Red","material":""}}
$ curl 'http://127.0.0.1:8080/hatinv/get/1/description'
{"style":"classic","size":"Large","color":"Red","material":""}
$ ab -c100 -n100000 -k 'http://127.0.0.1:8080/hatinv/get/1?benchmark_only=1024'Environment:
HOST: Dell Inc. PowerEdge R420/0VD50G CPU: 2 x Intel(R) Xeon(R) CPU E5-2420 v2 @ 2.20GHz (12 #core, 24 #HT) OS: CentOS Linux release 7.0.1406 (Core)
Simple GET/PUT REST-JSON Result:
Simple GET: 169,437 [req#/sec] (mean) Simple PUT: 102,961 [req#/sec] (mean)
Details:
To run:
sbt run
cd src/test/resources/avsc
./bench-get.sh
./bench-put.shchana stores Avro record, with two groups of APIs:
- Primitive API (Scala/Java)
- RESTful API
use XPath expression to locate avro field. See XPath
case class PutSchema(entityName: String, schema: String, entityFullName: Option[String], idleTimeout: Duration)
case class RemoveSchema(entityName: String)case class GetRecord(id: String)
case class GetRecordAvro(id: String)
case class GetRecordJson(id: String)
case class PutRecord(id: String, record: Record)
case class PutRecordJson(id: String, record: String)
case class GetField(id: String, field: String)
case class GetFieldAvro(id: String, field: String)
case class GetFieldJson(id: String, field: String)
case class PutField(id: String, field: String, value: Any)
case class PutFieldJson(id: String, field: String, value: String)
case class Select(id: String, path: String)
case class SelectAvro(id: String, path: String)
case class SelectJson(id: String, path: String)
case class Update(id: String, path: String, value: Any)
case class UpdateJson(id: String, path: String, value: String)case class Insert(id: String, path: String, value: Any)
case class InsertJson(id: String, path: String, value: String)
case class InsertAll(id: String, path: String, values: List[_])
case class InsertAllJson(id: String, path: String, values: String)
case class Delete(id: String, path: String)
case class Clear(id: String, path: String)case class PutScript(entity: String, field: String, id: String, script: String)
case class RemoveScript(entity: String, field: String, id: String)POST /schema/put/$entityName?fullname=entity_full_name&timeout=1000 Host: status.wandoujia.com Content-Type: application/octet-stream Content-Length: NNN BODY: <SCHEMA_STRING>
parameters:
- fullname: for schema that contains multiple referenced complex types in union, you should provide the full name of main entry. Optional
- timeout: idle timeout in milliseconds. Optional
GET /schema/del/$entityName/ Host: status.wandoujia.com
GET /$entity/get/$id/ Host: status.wandoujia.com
GET /$entity/get/$id/$field Host: status.wandoujia.com
POST /$entity/put/$id/ Host: status.wandoujia.com Content-Type: application/octet-stream Content-Length: NNN BODY: <JSON_STRING>
POST /$entity/put/$id/$field Host: status.wandoujia.com Content-Type: application/octet-stream Content-Length: NNN BODY: <JSON_STRING>
POST /$entity/select/$id/ Host: status.wandoujia.com Content-Type: application/octet-stream Content-Length: NNN BODY: $xpath
POST /$entity/update/$id/ Host: status.wandoujia.com Content-Type: application/octet-stream Content-Length: NNN BODY: $xpath <JSON_STRING>
Example (update array field -> record’s number field):
POST /account/update/12345/ BODY: /chargeRecords[1].time 1234
Example (update map field -> record’s number field):
POST /account/update/12345/ BODY: /devApps/@a/numBlackApps 1234
POST /$entity/insert/$id/ Host: status.wandoujia.com Content-Type: application/octet-stream Content-Length: NNN BODY: $xpath <JSON_STRING>
Example (insert to array field):
POST /account/insert/12345/
BODY:
/chargeRecords
{"time": 4, "amount": -4.0}
Example (insert to map field):
POST /account/insert/12345/
BODY:
/devApps
{"h" : {"numBlackApps": 10}}
POST /$entity/insertall/$id/ Host: status.wandoujia.com Content-Type: application/octet-stream Content-Length: NNN BODY: $xpath <JSON_STRING>
Example (insert to array field):
POST /account/insertall/12345/
BODY:
/chargeRecords
[{"time": -1, "amount": -5.0}, {"time": -2, "amount": -6.0}]
Example (insert to map field):
POST /account/insertall/12345/
BODY:
/devApps
{"g" : {}, "h" : {"numBlackApps": 10}}
POST /$entity/delete/$id/ Host: status.wandoujia.com Content-Type: application/octet-stream Content-Length: NNN BODY: $xpath
POST /$entity/clear/$id/ Host: status.wandoujia.com Content-Type: application/octet-stream Content-Length: NNN BODY: $xpath
POST /$entity/script/put/$field/$scriptid/ Host: status.wandoujia.com Content-Type: application/octet-stream Content-Length: NNN BODY: <JavaScript>
GET /$entity/script/del/$field/$scriptid/ Host: status.wandoujia.com
Note:
- Replace $entitywith the object/table/entity name
- Replace $idwith object id
- Replace $xpathwith actual xpath expression
- Put the $xpathand JSON format value(s) for update / insert / insertall in POST body, separate$xpathand JSON value(s) with \n, and make sure it’s encoded as binary, set Content-Type: application/octet-stream
The bindings that could be accessed in script:
def prepareBindings(onUpdated: OnUpdated) = {
  val bindings = new SimpleBindings
  bindings.put("http_get", http_get)
  bindings.put("http_post", http_post)
  bindings.put("id", onUpdated.id)
  bindings.put("record", record)
  bindings.put("binlogs", binlogs)
  bindings
}Where,
- http_get: a function could be invoked via- http_get.apply(url: CharSequence, timeoutInSeconds: Int), returns scala.concurrent.Future[Any]
- http_post: a function could be invoked via- http_post.apply(url: CharSequence, body: CharSequence, timeoutInSeconds: Int)returns scala.concurrent.Future[Any]
- id: the id of this entity
- record: the entity record after updated
- binlogs: array of Binlog(s) during this updating action
- binlogs[i].type(): -1 - Clear, 0 - Delete, 1 - Change, 2 - Insert
- binlogs[i].xpath(): XPath Location of changed value
- binlogs[i].value(): value - Change/Insert, keys - Delete, void - Clear
- The JavaScript code should do what ever operation via function only. You can define local variables in function, and transfer these local vars between functions to share them instead of defining global vars.