1#!/usr/bin/python3 2 3import sys 4import gi 5gi.require_version('Gst', '1.0') 6from gi.repository import GObject, Gst 7 8def bus_call(bus, msg, *args): 9 # print("BUSCALL", msg, msg.type, *args) 10 if msg.type == Gst.MessageType.EOS: 11 print("End-of-stream") 12 loop.quit() 13 return 14 elif msg.type == Gst.MessageType.ERROR: 15 print("GST ERROR", msg.parse_error()) 16 loop.quit() 17 return 18 return True 19 20saturation = -100 21def set_saturation(pipeline): 22 global saturation 23 if saturation <= 100: 24 print("Setting saturation to {0}".format(saturation)) 25 videosrc.set_property("saturation", saturation) 26 videosrc.set_property("annotation-text", "Saturation %d" % (saturation)) 27 else: 28 pipeline.send_event (Gst.Event.new_eos()) 29 return False 30 saturation += 10 31 return True 32 33 34if __name__ == "__main__": 35 GObject.threads_init() 36 # initialization 37 loop = GObject.MainLoop() 38 Gst.init(None) 39 40 pipeline = Gst.parse_launch ("rpicamsrc name=src ! video/x-h264,width=320,height=240 ! h264parse ! mp4mux ! filesink name=s") 41 if pipeline == None: 42 print ("Failed to create pipeline") 43 sys.exit(0) 44 45 # watch for messages on the pipeline's bus (note that this will only 46 # work like this when a GLib main loop is running) 47 bus = pipeline.get_bus() 48 bus.add_watch(0, bus_call, loop) 49 50 videosrc = pipeline.get_by_name ("src") 51 videosrc.set_property("saturation", saturation) 52 videosrc.set_property("annotation-mode", 1) 53 54 sink = pipeline.get_by_name ("s") 55 sink.set_property ("location", "test.mp4") 56 57 # this will call set_saturation every 1s 58 GObject.timeout_add(1000, set_saturation, pipeline) 59 60 # run 61 pipeline.set_state(Gst.State.PLAYING) 62 try: 63 loop.run() 64 except Exception as e: 65 print(e) 66 # cleanup 67 pipeline.set_state(Gst.State.NULL) 68 69