Apache NiFi
What is Apache NiFi?
NiFi was built to automate the flow of data between systems.While the term dataflow
is used in a variety of contexts,
we use it here to mean the automated and managed flow of information between systems.
- Documentation
-
Link:
http://localhost:8080/nifi/
- workdir:
/var/opt/data/flat/ampdocker/files/nifi
Other: https://meritis.fr/bigdata/travailler-donnee-apache-nifi/
Install Nifi
version: '2'
services:
nifi:
image: apache/nifi:1.9.2
container_name: nifi
hostname: apachenifi
restart: unless-stopped
ports:
- "8080:8080"
#environments:
# - NIFI_WEB_HTTP_PORT='9090'
Add/Modify json keys
attributetojson with Jolt
- https://github.com/bazaarvoice/jolt#stock-transforms
- https://community.hortonworks.com/questions/106614/nifi-how-to-add-keyvalue-to-json-1.html
executescript code
- https://community.hortonworks.com/questions/75523/processor-for-replacing-json-values-dynamically-an.html
- https://stackoverflow.com/questions/47475464/json-transformation-with-data-manipulation-using-apache-nifi
Operation
Chain
[
{
"operation": "modify-default-beta",
"spec": {
"Basicat": "mon_application",
"pool": "mon pool",
"items": {
"*": {
"state_value": 100
}
}
}
}
]
[
{
"operation": "shift",
"spec": {
//"selfLink": "Pool",
"items": {
"*": {
//"session": "members.session"
// "name": "members.name",
// "state": "members.state",
//"session": "members.session"
"@name": "Members[#2].Name",
"@state": "Members[#2].State",
"@session": "Members[#2].Session"
}
},
"selfLink": "Pool"
}
},
{
"operation": "default",
"spec": {
"Basicat": "myapp",
"Platform": "myenv",
"timestamp": "temstamp"
}
}
]
{
"Members" : [ {
"Name" : "server1.example.com:443",
"State" : "up",
"Session" : "monitor-enabled"
}, {
"Name" : "server2.example.com:443",
"State" : "up",
"Session" : "monitor-enabled"
} ],
"Pool" : "https://localhost/mgmt/tm/ltm/pool/pool-443-HTTPS-FLIPAPI-UAT/members?ver=12.1.3",
"timestamp" : "temstamp",
"Basicat" : "myapp",
"Platform" : "myenv"
}
with jq
cat bigip.json | \
jq '{ Pool: .selfLink, MEMBERS: [ {name: .items[].name, state: .items[].state, value: (if .items[].state == "up" then 10 elif .items[].state == "down" then 5 else 0 end) } ] }' \
| jq -M '.MEMBERS |= unique_by({name, state})' > out.json
{
"Pool": null,
"MEMBERS": [
{
"name": "server1.example.com:443",
"state": "up",
"value": 10
},
{
"name": "server2.example.com:443",
"state": "up",
"value": 10
}
]
}
cat out.json | jq '{data:{result: "'$result'", error: 0, buildVariables:{BASICAT: "'$basicat'", PLATFORM: "'$platform'",HALF_PLATFORM: "'$halfplatform'", POOL: "'$pool'", MEMBER: .MEMBERS }}, "@timestamp": "'$(date -u "+%FT%T%2N")'"}'
{
"data": {
"result": "FAILURE",
"error": 0,
"buildVariables": {
"BASICAT": "dxp",
"PLATFORM": "uat",
"HALF_PLATFORM": "2",
"POOL": "pool-dxp-uat",
"MEMBER": [
{
"name": "server1.example.com:443",
"state": "up",
"value": 10
},
{
"name": "server2.example.com:443",
"state": "up",
"value": 10
}
]
}
},
"@timestamp": "2019-08-20T12:16:1440"
}